Tuesday, April 17, 2012

StatsD and Graphite

I have been kept finding a monitor solution a couple time. There are several solutions exist, including nagios, ganglia. Ganglia offers a monitoring solution, while Nagios offers a alerting solution too. Both Ganglia and Nagios seem a little complicated, and need a considerable effort to configure. Be honest that I am not familiar with Nagios and Ganglia, and just give a quick look on both of them.

After reading the post Measure Anything, Measure Everything from Esty, I started to have a try on statsd and graphite. Statsd is very simple to setup and easy to integrate with system, but Graphite really took me a very hard time to setup. As at first I tried to set up Graphite on ubuntu8.04, it failed finally due to python is not compitable with py2cario.

After re-installed ubuntu10.4 server, and followed the instruction of how to install graphite on ubuntu, both statsd and Graphite startuped successfully.

The latest version of graphite is 0.9.9, and lacking of document, i am looking forward its 1.0 which planned to release at 2012-01-01 and hugely delayed from its timeline.

Statsd is very simple, it is a daemon of node.js, and its stats.js contains only about 300 lines code. By reveiwing its code, you can fully understand the concept introduced in Measure Anything, Measure Everything, especially counter and timing.

Now it is time to run a test.

1. Launch Graphite(by default graphite will be installed to /opt/graphite).
> sudo /etc/init.d/apache2 restart # startup apache(graphite-web).
> cd /opt/graphite/> sudo ./bin/carbon-cache.py start # startup carbon to receive incoming stats.

2. Launch Statsd
> cd $STATSD_HOME
> sudo cp exampleConfig.js myconfig.js # change the configuration according to your requirement
> sudo node stats.js myconfig.js

3. Run statsd client(actually there is a java client)
> java Main

import java.util.Random;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;

public class Main{
  public static void main(String args[]) throws Exception{
    DatagramSocket socket = new DatagramSocket();
    while(true) {
      // send counter stats
      int c = new Random().nextInt(10);
      String data = "ramoncounter:" + c + "|c";
      byte[] d = data.getBytes();
      socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125));
      System.out.println("[Send] " + data + "...Sleep 10 seconds!");

      // send timer stats
      data = "";
      data += "ramontimer:" + new Random().nextInt(30) + "|ms";
      d = data.getBytes();
      socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125));
      System.out.println("[Send] " + data + "...Sleep 10 seconds!");

      Thread.sleep(10 * 1000);
    }
  }
}

Now open web browser and access http://GRAPHITE_HOST, you will be welcomed be graphite web UI.

How graphite maintain the incoming data points?

This is a big topic, graphite includes 3 components:
  1. carbon - a Twisted daemon that listens for time-series data.
  2. whisper - a simple database library for storing time-series data (similar in design to RRD)
  3. graphite webapp - A Django webapp that renders graphs on-demand using Cairo
Whisper is the data storage engine, and it contain one or more archives, each with a specific data resolution and retention (defined in number of points or max timestamp age). Archives are ordered from the highest-resolution and shortest retention archive to the lowest-resolution and longest retention period archive.

As the official document is so limited, and lacks of examples, it is hard to really understand how whisper works. I modified whisper.py by outputting more log messages to understand its mechanism.

The main functions of whisper.py are:

  • def create(path,archiveList,xFilesFactor=None,aggregationMethod=None)
  • def update(path,value,timestamp=None)
  • def fetch(path,fromTime,untilTime=None)
  • def info(path) - get header information.

By running the test client(Main.class), I have got a ramoncounter.wsp file which contains all data points of the given metrics. Its header information is as below:

maxRetention: 157784400
xFilesFactor: 0.5
aggregationMethod: average
fileSize: 3302620

Archive 0
retention: 21600
secondsPerPoint: 10
points: 2160
size: 25920
offset: 52

Archive 1
retention: 604800
secondsPerPoint: 60
points: 10080
size: 120960
offset: 25972

Archive 2
retention: 157784400
secondsPerPoint: 600
points: 262974
size: 3155688
offset: 146932

The amended whisper.py:
  1 #!/usr/bin/env python
  2 # Copyright 2008 Orbitz WorldWide
  3 #
  4 # Licensed under the Apache License, Version 2.0 (the "License");
  5 # you may not use this file except in compliance with the License.
  6 # You may obtain a copy of the License at
  7 #
  8 #   http://www.apache.org/licenses/LICENSE-2.0
  9 #
 10 # Unless required by applicable law or agreed to in writing, software
 11 # distributed under the License is distributed on an "AS IS" BASIS,
 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13 # See the License for the specific language governing permissions and
 14 # limitations under the License.
 15 #
 16 #
 17 # This module is an implementation of the Whisper database API
 18 # Here is the basic layout of a whisper data file
 19 #
 20 # File = Header,Data
 21 #       Header = Metadata,ArchiveInfo+
 22 #               Metadata = aggregationType,maxRetention,xFilesFactor,archiveCount
 23 #               ArchiveInfo = Offset,SecondsPerPoint,Points
 24 #       Data = Archive+
 25 #               Archive = Point+
 26 #                       Point = timestamp,value
 27 
 28 import os, struct, time, logging
 29 
 30 logging.basicConfig(filename="whisper.log",level=logging.DEBUG)
 31 
 32 try:
 33   import fcntl
 34   CAN_LOCK = True
 35 except ImportError:
 36   CAN_LOCK = False
 37 
 38 LOCK = False
 39 CACHE_HEADERS = False
 40 AUTOFLUSH = False
 41 __headerCache = {}
 42 
 43 longFormat = "!L"
 44 longSize = struct.calcsize(longFormat)
 45 floatFormat = "!f"
 46 floatSize = struct.calcsize(floatFormat)
 47 valueFormat = "!d"
 48 valueSize = struct.calcsize(valueFormat)
 49 pointFormat = "!Ld"
 50 pointSize = struct.calcsize(pointFormat)
 51 metadataFormat = "!2LfL"
 52 metadataSize = struct.calcsize(metadataFormat)
 53 archiveInfoFormat = "!3L"
 54 archiveInfoSize = struct.calcsize(archiveInfoFormat)
 55 
 56 aggregationTypeToMethod = dict({
 57   1: 'average',
 58   2: 'sum',
 59   3: 'last',
 60   4: 'max',
 61   5: 'min'
 62 })
 63 aggregationMethodToType = dict([[v,k] for k,v in aggregationTypeToMethod.items()])
 64 aggregationMethods = aggregationTypeToMethod.values()
 65 
 66 debug = startBlock = endBlock = lambda *a,**k: None
 67 
 68 UnitMultipliers = {
 69   's' : 1,
 70   'm' : 60,
 71   'h' : 60 * 60,
 72   'd' : 60 * 60 * 24,
 73   'y' : 60 * 60 * 24 * 365,
 74 }
 75 
 76 
 77 def parseRetentionDef(retentionDef):
 78   (precision, points) = retentionDef.strip().split(':')
 79 
 80   if precision.isdigit():
 81     precisionUnit = 's'
 82     precision = int(precision)
 83   else:
 84     precisionUnit = precision[-1]
 85     precision = int( precision[:-1] )
 86 
 87   if points.isdigit():
 88     pointsUnit = None
 89     points = int(points)
 90   else:
 91     pointsUnit = points[-1]
 92     points = int( points[:-1] )
 93 
 94   if precisionUnit not in UnitMultipliers:
 95     raise ValueError("Invalid unit: '%s'" % precisionUnit)
 96 
 97   if pointsUnit not in UnitMultipliers and pointsUnit is not None:
 98     raise ValueError("Invalid unit: '%s'" % pointsUnit)
 99 
100   precision = precision * UnitMultipliers[precisionUnit]
101 
102   if pointsUnit:
103     points = points * UnitMultipliers[pointsUnit] / precision
104 
105   return (precision, points)
106 
107 class WhisperException(Exception):
108     """Base class for whisper exceptions."""
109 
110 
111 class InvalidConfiguration(WhisperException):
112     """Invalid configuration."""
113 
114 
115 class InvalidAggregationMethod(WhisperException):
116     """Invalid aggregation method."""
117 
118 
119 class InvalidTimeInterval(WhisperException):
120     """Invalid time interval."""
121 
122 
123 class TimestampNotCovered(WhisperException):
124     """Timestamp not covered by any archives in this database."""
125 
126 class CorruptWhisperFile(WhisperException):
127   def __init__(self, error, path):
128     Exception.__init__(self, error)
129     self.error = error
130     self.path = path
131 
132   def __repr__(self):
133     return "<CorruptWhisperFile[%s] %s>" % (self.path, self.error)
134 
135   def __str__(self):
136     return "%s (%s)" % (self.error, self.path)
137 
138 def enableDebug():
139   global open, debug, startBlock, endBlock
140   class open(file):
141     def __init__(self,*args,**kwargs):
142       file.__init__(self,*args,**kwargs)
143       self.writeCount = 0
144       self.readCount = 0
145 
146     def write(self,data):
147       self.writeCount += 1
148       debug('WRITE %d bytes #%d' % (len(data),self.writeCount))
149       return file.write(self,data)
150 
151     def read(self,bytes):
152       self.readCount += 1
153       debug('READ %d bytes #%d' % (bytes,self.readCount))
154       return file.read(self,bytes)
155 
156   def debug(message):
157     print 'DEBUG :: %s' % message
158 
159   __timingBlocks = {}
160 
161   def startBlock(name):
162     __timingBlocks[name] = time.time()
163 
164   def endBlock(name):
165     debug("%s took %.5f seconds" % (name,time.time() - __timingBlocks.pop(name)))
166 
167 
168 def __readHeader(fh):
169   info = __headerCache.get(fh.name)
170   if info:
171     return info
172 
173   originalOffset = fh.tell()
174   fh.seek(0)
175   packedMetadata = fh.read(metadataSize)
176 
177   try:
178     (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
179   except:
180     raise CorruptWhisperFile("Unable to read header", fh.name)
181 
182   archives = []
183 
184   for i in xrange(archiveCount):
185     packedArchiveInfo = fh.read(archiveInfoSize)
186     try:
187       (offset,secondsPerPoint,points) = struct.unpack(archiveInfoFormat,packedArchiveInfo)
188     except:
189       raise CorruptWhisperFile("Unable to read archive %d metadata" % i, fh.name)
190 
191     archiveInfo = {
192       'offset' : offset,
193       'secondsPerPoint' : secondsPerPoint,
194       'points' : points,
195       'retention' : secondsPerPoint * points,
196       'size' : points * pointSize,
197     }
198     archives.append(archiveInfo)
199 
200   fh.seek(originalOffset)
201   info = {
202     'aggregationMethod' : aggregationTypeToMethod.get(aggregationType, 'average'),
203     'maxRetention' : maxRetention,
204     'xFilesFactor' : xff,
205     'archives' : archives,
206   }
207   if CACHE_HEADERS:
208     __headerCache[fh.name] = info
209 
210   return info
211 
212 
213 def setAggregationMethod(path, aggregationMethod):
214   """setAggregationMethod(path,aggregationMethod)
215 
216 path is a string
217 aggregationMethod specifies the method to use when propogating data (see ``whisper.aggregationMethods``)
218 """
219   fh = open(path,'r+b')
220   if LOCK:
221     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
222 
223   packedMetadata = fh.read(metadataSize)
224 
225   try:
226     (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
227   except:
228     raise CorruptWhisperFile("Unable to read header", fh.name)
229 
230   try:
231     newAggregationType = struct.pack( longFormat, aggregationMethodToType[aggregationMethod] )
232   except KeyError:
233     raise InvalidAggregationMethod("Unrecognized aggregation method: %s" %
234           aggregationMethod)
235 
236   fh.seek(0)
237   fh.write(newAggregationType)
238 
239   if AUTOFLUSH:
240     fh.flush()
241     os.fsync(fh.fileno())
242 
243   if CACHE_HEADERS and fh.name in __headerCache:
244     del __headerCache[fh.name]
245 
246   fh.close()
247 
248   return aggregationTypeToMethod.get(aggregationType, 'average')
249 
250 
251 def validateArchiveList(archiveList):
252   """ Validates an archiveList. archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
253   An ArchiveList must:
254   1. Have at least one archive config. Example: (60, 86400) 
255   2. No archive may be a duplicate of another. 
256   3. Higher precision archives' precision must evenly divide all lower precision archives' precision.
257   4. Lower precision archives must cover larger time intervals than higher precision archives.
258 
259   Returns True or False
260   """
261 
262   try:
263     if not archiveList:
264       raise InvalidConfiguration("You must specify at least one archive configuration!")
265 
266     archiveList.sort(key=lambda a: a[0]) #sort by precision (secondsPerPoint)
267 
268     for i,archive in enumerate(archiveList):
269       if i == len(archiveList) - 1:
270         break
271 
272       next = archiveList[i+1]
273       if not (archive[0] < next[0]):
274         raise InvalidConfiguration("You cannot configure two archives "
275           "with the same precision %s,%s" % (archive,next))
276 
277       if (next[0] % archive[0]) != 0:
278         raise InvalidConfiguration("Higher precision archives' precision "
279           "must evenly divide all lower precision archives' precision %s,%s" \
280           % (archive[0],next[0]))
281 
282       retention = archive[0] * archive[1]
283       nextRetention = next[0] * next[1]
284 
285       if not (nextRetention > retention):
286         raise InvalidConfiguration("Lower precision archives must cover "
287           "larger time intervals than higher precision archives %s,%s" \
288           % (archive,next))
289 
290   except:
291         # RAMON: no exceptions will be thrown out???
292     return False
293   return True
294 
295 def create(path,archiveList,xFilesFactor=None,aggregationMethod=None):
296   """create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average')
297 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
298 path is a string
299 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
300 xFilesFactor specifies the fraction of data points in a propagation interval that must have known values for a propagation to occur
301 aggregationMethod specifies the function to use when propogating data (see ``whisper.aggregationMethods``)
302 """
303   # Set default params
304   if xFilesFactor is None:
305     xFilesFactor = 0.5
306   if aggregationMethod is None:
307     aggregationMethod = 'average'
308 
309   #Validate archive configurations...
310   validArchive = validateArchiveList(archiveList)
311   if not validArchive:
312     raise InvalidConfiguration("There was a problem creating %s due to an invalid schema config." % path)
313 
314   #Looks good, now we create the file and write the header
315   if os.path.exists(path):
316     raise InvalidConfiguration("File %s already exists!" % path)
317 
318   fh = open(path,'wb')
319   if LOCK:
320     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
321 
322   aggregationType = struct.pack( longFormat, aggregationMethodToType.get(aggregationMethod, 1) )
323   # for example a archieve list, [[10,2160],[60,10080],[600,262974]]
324   oldest = sorted([secondsPerPoint * points for secondsPerPoint,points in archiveList])[-1]
325   # the retention of [600,262974] is 600*262974
326   maxRetention = struct.pack( longFormat, oldest )
327   # xFilesFactor = 0.5
328   xFilesFactor = struct.pack( floatFormat, float(xFilesFactor) )
329   # archiveCount = 3
330   archiveCount = struct.pack(longFormat, len(archiveList))
331   packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
332   fh.write(packedMetadata)
333   # !2LfL means L+L+f+L
334   # Metadata    !2LfL   aggregationType,maxRetention,xFilesFactor,archiveCount
335   #     ArchiveInfo     !3L             Offset,SecondsPerPoint,Points
336   headerSize = metadataSize + (archiveInfoSize * len(archiveList))
337   archiveOffsetPointer = headerSize
338 
339   for secondsPerPoint,points in archiveList:
340         # record the start point(offset) of each archieve.
341     archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
342     fh.write(archiveInfo)
343     archiveOffsetPointer += (points * pointSize)
344 
345   # perserver the disk space for all archives.
346   zeroes = '\x00' * (archiveOffsetPointer - headerSize)
347   fh.write(zeroes)
348 
349   if AUTOFLUSH:
350     fh.flush()
351     os.fsync(fh.fileno())
352 
353   fh.close()
354 
355 def __aggregate(aggregationMethod, knownValues):
356   if aggregationMethod == 'average':
357     return float(sum(knownValues)) / float(len(knownValues))
358   elif aggregationMethod == 'sum':
359     return float(sum(knownValues))
360   elif aggregationMethod == 'last':
361     return knownValues[len(knownValues)-1]
362   elif aggregationMethod == 'max':
363     return max(knownValues)
364   elif aggregationMethod == 'min':
365     return min(knownValues)
366   else:
367     raise InvalidAggregationMethod("Unrecognized aggregation method %s" %
368             aggregationMethod)
369 
370 def __propagate(fh,header,timestamp,higher,lower):
371   aggregationMethod = header['aggregationMethod']
372   xff = header['xFilesFactor']
373 
374   # guarantee the timestamp can be evenly divided by secondsPerPoint...but 
375   # it will lose data precision, right??
376   lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
377   logging.info("timestamp is %d" % timestamp)
378   logging.info("lowerIntervalStart is %d" % lowerIntervalStart)
379   lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']
380   logging.info("lowerIntervalEnd is %d" % lowerIntervalEnd)
381 
382   fh.seek(higher['offset'])
383   packedPoint = fh.read(pointSize)
384   (higherBaseInterval,higherBaseValue) = struct.unpack(pointFormat,packedPoint)
385   logging.info("higherBaseInterval,higherBaseValue is %d,%d" % (higherBaseInterval,higherBaseValue))
386 
387   if higherBaseInterval == 0:
388     higherFirstOffset = higher['offset']
389   else:
390     timeDistance = lowerIntervalStart - higherBaseInterval
391     pointDistance = timeDistance / higher['secondsPerPoint']
392     logging.info("higher['secondsPerPoint'] is %d" % higher['secondsPerPoint'])
393     byteDistance = pointDistance * pointSize
394         # higher[:higherFirstOffset] the first data point till the data point which timestamp matches the given timestamp
395     higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])
396     logging.info("higherFirstOffset is %d" % higherFirstOffset)
397 
398   higherPoints = lower['secondsPerPoint'] / higher['secondsPerPoint']
399   higherSize = higherPoints * pointSize
400   relativeFirstOffset = higherFirstOffset - higher['offset']
401   logging.info("relativeFirstOffset is %d" % relativeFirstOffset)
402   relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
403   logging.info("relativeLastOffset is %d" % relativeLastOffset)
404   higherLastOffset = relativeLastOffset + higher['offset']
405   logging.info("higherLastOffset is %d" % relativeLastOffset)
406   fh.seek(higherFirstOffset)
407 
408   if higherFirstOffset < higherLastOffset: #we don't wrap the archive
409     seriesString = fh.read(higherLastOffset - higherFirstOffset)
410   else: #We do wrap the archive..round robin
411     higherEnd = higher['offset'] + higher['size']
412     seriesString = fh.read(higherEnd - higherFirstOffset)
413     fh.seek(higher['offset'])
414     seriesString += fh.read(higherLastOffset - higher['offset'])
415 
416   #Now we unpack the series data we just read
417   byteOrder,pointTypes = pointFormat[0],pointFormat[1:]
418   points = len(seriesString) / pointSize
419   logging.info("points is %d" % points)
420   seriesFormat = byteOrder + (pointTypes * points)
421   unpackedSeries = struct.unpack(seriesFormat, seriesString)
422   logging.info("unpackedSeries is %s" % str(unpackedSeries))
423 
424   #And finally we construct a list of values
425   neighborValues = [None] * points
426   currentInterval = lowerIntervalStart
427   logging.info("currentInterval is %d" % currentInterval)
428   step = higher['secondsPerPoint']
429   logging.info("step is %d" % step)
430 
431   # the value of unpackedSeries will like below:
432   # (1334806980, 0.29999999999999999, 1334806990, 0.20000000000000001, 1334828600, 12.0, 1334807010, 0.29999999999999999, 1334807020, 0.5)
433   # and the xrange(0,len(unpackedSeries),2) will return [0,2,4,6...]
434   for i in xrange(0,len(unpackedSeries),2):
435     pointTime = unpackedSeries[i]
436     if pointTime == currentInterval:
437           # what does this mean???...check above comments.
438       neighborValues[i/2] = unpackedSeries[i+1]
439     currentInterval += step
440   logging.info("neighborValues is %s" % str(neighborValues))
441 
442   #Propagate aggregateValue to propagate from neighborValues if we have enough known points
443   knownValues = [v for v in neighborValues if v is not None]
444   if not knownValues:
445     return False
446   logging.info("knownValues is %s" % str(knownValues))
447 
448   knownPercent = float(len(knownValues)) / float(len(neighborValues))
449   logging.info("knownPercent is %f" % knownPercent)
450   logging.info("xff is %f" % xff)
451   if knownPercent >= xff: #we have enough data to propagate a value!
452     logging.info("aggregationMethod is %s" % str(aggregationMethod))
453     aggregateValue = __aggregate(aggregationMethod, knownValues)
454     logging.info("aggregateValue is %f" % aggregateValue)
455     myPackedPoint = struct.pack(pointFormat,lowerIntervalStart,aggregateValue)
456     fh.seek(lower['offset'])
457     packedPoint = fh.read(pointSize)
458     (lowerBaseInterval,lowerBaseValue) = struct.unpack(pointFormat,packedPoint)
459 
460         # create or update
461     if lowerBaseInterval == 0: #First propagated update to this lower archive
462       fh.seek(lower['offset'])
463       fh.write(myPackedPoint)
464     else: #Not our first propagated update to this lower archive
465       timeDistance = lowerIntervalStart - lowerBaseInterval
466       pointDistance = timeDistance / lower['secondsPerPoint']
467       byteDistance = pointDistance * pointSize
468       lowerOffset = lower['offset'] + (byteDistance % lower['size'])
469       fh.seek(lowerOffset)
470       fh.write(myPackedPoint)
471 
472     return True
473 
474   else:
475     return False
476 
477 
478 def update(path,value,timestamp=None):
479   """update(path,value,timestamp=None)
480 
481 path is a string
482 value is a float
483 timestamp is either an int or float
484 """
485   value = float(value)
486   fh = open(path,'r+b')
487   return file_update(fh, value, timestamp)
488 
489 
490 def file_update(fh, value, timestamp):
491   if LOCK:
492     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
493 
494   header = __readHeader(fh)
495   now = int( time.time() )
496   if timestamp is None:
497     timestamp = now
498 
499   timestamp = int(timestamp)
500   diff = now - timestamp
501   logging.info("diff(now - timestamp) is %d" % diff)
502   if not ((diff < header['maxRetention']) and diff >= 0):
503     raise TimestampNotCovered("Timestamp not covered by any archives in "
504       "this database.")
505 
506   # for [[10,2160],[60,10080],[600,262974]], [10,2160] is the highest-precision archive, while 
507   # [600,262974] is the lowest-precision archive...the archive list is sorted from highest-precision
508   # to lowest-precision.
509   for i,archive in enumerate(header['archives']): #Find the highest-precision archive that covers timestamp
510     if archive['retention'] < diff: continue
511     lowerArchives = header['archives'][i+1:] #We'll pass on the update to these lower precision archives later
512     break
513 
514   # The scope of variable 'archive' in for loop is beyond the for loop...a little strange feature!
515   # First we update the highest-precision archive
516   myInterval = timestamp - (timestamp % archive['secondsPerPoint'])
517   myPackedPoint = struct.pack(pointFormat,myInterval,value)
518   fh.seek(archive['offset'])
519   packedPoint = fh.read(pointSize)
520   (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint)
521 
522   if baseInterval == 0: #This file's first update
523     # seek(offset) will reach the absolute position specified by offset.
524     fh.seek(archive['offset'])
525     fh.write(myPackedPoint)
526     baseInterval,baseValue = myInterval,value
527   else: #Not our first update
528     timeDistance = myInterval - baseInterval
529     pointDistance = timeDistance / archive['secondsPerPoint']
530     byteDistance = pointDistance * pointSize
531         # byteDistance % archive['size'] round-robin
532     myOffset = archive['offset'] + (byteDistance % archive['size'])
533     fh.seek(myOffset)
534     fh.write(myPackedPoint)
535 
536   #Now we propagate the update to lower-precision archives
537   higher = archive
538   logging.info("higher archive:" + str(higher))
539   for lower in lowerArchives:
540     if not __propagate(fh, header, myInterval, higher, lower):
541       break
542     higher = lower
543 
544   if AUTOFLUSH:
545     fh.flush()
546     os.fsync(fh.fileno())
547 
548   fh.close()
549 
550 
551 def update_many(path,points):
552   """update_many(path,points)
553 
554 path is a string
555 points is a list of (timestamp,value) points
556 """
557   if not points: return
558   points = [ (int(t),float(v)) for (t,v) in points]
559   points.sort(key=lambda p: p[0],reverse=True) #order points by timestamp, newest first
560   fh = open(path,'r+b')
561   return file_update_many(fh, points)
562 
563 
564 def file_update_many(fh, points):
565   if LOCK:
566     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
567 
568   header = __readHeader(fh)
569   now = int( time.time() )
570   archives = iter( header['archives'] )
571   currentArchive = archives.next()
572   currentPoints = []
573 
574   for point in points:
575     age = now - point[0]
576 
577     while currentArchive['retention'] < age: #we can't fit any more points in this archive
578       if currentPoints: #commit all the points we've found that it can fit
579         currentPoints.reverse() #put points in chronological order
580         __archive_update_many(fh,header,currentArchive,currentPoints)
581         currentPoints = []
582       try:
583         currentArchive = archives.next()
584       except StopIteration:
585         currentArchive = None
586         break
587 
588     if not currentArchive:
589       break #drop remaining points that don't fit in the database
590 
591     currentPoints.append(point)
592 
593   if currentArchive and currentPoints: #don't forget to commit after we've checked all the archives
594     currentPoints.reverse()
595     __archive_update_many(fh,header,currentArchive,currentPoints)
596 
597   if AUTOFLUSH:
598     fh.flush()
599     os.fsync(fh.fileno())
600 
601   fh.close()
602 
603 
604 def __archive_update_many(fh,header,archive,points):
605   step = archive['secondsPerPoint']
606   alignedPoints = [ (timestamp - (timestamp % step), value)
607                     for (timestamp,value) in points ]
608   #Create a packed string for each contiguous sequence of points
609   packedStrings = []
610   previousInterval = None
611   currentString = ""
612   for (interval,value) in alignedPoints:
613     if (not previousInterval) or (interval == previousInterval + step):
614       currentString += struct.pack(pointFormat,interval,value)
615       previousInterval = interval
616     else:
617       numberOfPoints = len(currentString) / pointSize
618       startInterval = previousInterval - (step * (numberOfPoints-1))
619       packedStrings.append( (startInterval,currentString) )
620       currentString = struct.pack(pointFormat,interval,value)
621       previousInterval = interval
622   if currentString:
623     numberOfPoints = len(currentString) / pointSize
624     startInterval = previousInterval - (step * (numberOfPoints-1))
625     packedStrings.append( (startInterval,currentString) )
626 
627   #Read base point and determine where our writes will start
628   fh.seek(archive['offset'])
629   packedBasePoint = fh.read(pointSize)
630   (baseInterval,baseValue) = struct.unpack(pointFormat,packedBasePoint)
631   if baseInterval == 0: #This file's first update
632     baseInterval = packedStrings[0][0] #use our first string as the base, so we start at the start
633 
634   #Write all of our packed strings in locations determined by the baseInterval
635   for (interval,packedString) in packedStrings:
636     timeDistance = interval - baseInterval
637     pointDistance = timeDistance / step
638     byteDistance = pointDistance * pointSize
639     myOffset = archive['offset'] + (byteDistance % archive['size'])
640     fh.seek(myOffset)
641     archiveEnd = archive['offset'] + archive['size']
642     bytesBeyond = (myOffset + len(packedString)) - archiveEnd
643 
644     if bytesBeyond > 0:
645       fh.write( packedString[:-bytesBeyond] )
646       assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString))
647       fh.seek( archive['offset'] )
648       fh.write( packedString[-bytesBeyond:] ) #safe because it can't exceed the archive (retention checking logic above)
649     else:
650       fh.write(packedString)
651 
652   #Now we propagate the updates to lower-precision archives
653   higher = archive
654   lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]
655 
656   for lower in lowerArchives:
657     fit = lambda i: i - (i % lower['secondsPerPoint'])
658     lowerIntervals = [fit(p[0]) for p in alignedPoints]
659     uniqueLowerIntervals = set(lowerIntervals)
660     propagateFurther = False
661     for interval in uniqueLowerIntervals:
662       if __propagate(fh, header, interval, higher, lower):
663         propagateFurther = True
664 
665     if not propagateFurther:
666       break
667     higher = lower
668 
669 
670 def info(path):
671   """info(path)
672 
673 path is a string
674 """
675   fh = open(path,'rb')
676   info = __readHeader(fh)
677   fh.close()
678   return info
679 
680 
681 def fetch(path,fromTime,untilTime=None):
682   """fetch(path,fromTime,untilTime=None)
683 
684 path is a string
685 fromTime is an epoch time
686 untilTime is also an epoch time, but defaults to now
687 """
688   fh = open(path,'rb')
689   return file_fetch(fh, fromTime, untilTime)
690 
691 
692 def file_fetch(fh, fromTime, untilTime):
693   header = __readHeader(fh)
694   now = int( time.time() )
695   if untilTime is None:
696     untilTime = now
697   fromTime = int(fromTime)
698   untilTime = int(untilTime)
699 
700   oldestTime = now - header['maxRetention']
701   if fromTime < oldestTime:
702     fromTime = oldestTime
703 
704   if not (fromTime < untilTime):
705     raise InvalidTimeInterval("Invalid time interval")
706   if untilTime > now:
707     untilTime = now
708   if untilTime < fromTime:
709     untilTime = now
710 
711   diff = now - fromTime
712   for archive in header['archives']:
713     if archive['retention'] >= diff:
714       break
715 
716   fromInterval = int( fromTime - (fromTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint']
717   untilInterval = int( untilTime - (untilTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint']
718   fh.seek(archive['offset'])
719   packedPoint = fh.read(pointSize)
720   (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint)
721 
722   if baseInterval == 0:
723     step = archive['secondsPerPoint']
724     points = (untilInterval - fromInterval) / step
725     timeInfo = (fromInterval,untilInterval,step)
726     valueList = [None] * points
727     return (timeInfo,valueList)
728 
729   #Determine fromOffset
730   timeDistance = fromInterval - baseInterval
731   pointDistance = timeDistance / archive['secondsPerPoint']
732   byteDistance = pointDistance * pointSize
733   fromOffset = archive['offset'] + (byteDistance % archive['size'])
734 
735   #Determine untilOffset
736   timeDistance = untilInterval - baseInterval
737   pointDistance = timeDistance / archive['secondsPerPoint']
738   byteDistance = pointDistance * pointSize
739   untilOffset = archive['offset'] + (byteDistance % archive['size'])
740 
741   #Read all the points in the interval
742   fh.seek(fromOffset)
743   if fromOffset < untilOffset: #If we don't wrap around the archive
744     seriesString = fh.read(untilOffset - fromOffset)
745   else: #We do wrap around the archive, so we need two reads
746     archiveEnd = archive['offset'] + archive['size']
747     seriesString = fh.read(archiveEnd - fromOffset)
748     fh.seek(archive['offset'])
749     seriesString += fh.read(untilOffset - archive['offset'])
750 
751   #Now we unpack the series data we just read (anything faster than unpack?)
752   byteOrder,pointTypes = pointFormat[0],pointFormat[1:]
753   points = len(seriesString) / pointSize
754   seriesFormat = byteOrder + (pointTypes * points)
755   unpackedSeries = struct.unpack(seriesFormat, seriesString)
756 
757   #And finally we construct a list of values (optimize this!)
758   valueList = [None] * points #pre-allocate entire list for speed
759   currentInterval = fromInterval
760   step = archive['secondsPerPoint']
761 
762   for i in xrange(0,len(unpackedSeries),2):
763     pointTime = unpackedSeries[i]
764     if pointTime == currentInterval:
765       pointValue = unpackedSeries[i+1]
766       valueList[i/2] = pointValue #in-place reassignment is faster than append()
767     currentInterval += step
768 
769   fh.close()
770   timeInfo = (fromInterval,untilInterval,step)
771   return (timeInfo,valueList)
772 
773 now = int( time.time() - 24*24*60 )
774 print 'update'
775 update("e:/tmp/ramoncounter-2.wsp", 12, now)
776 
777 

Be noticed that ramoncounter.wsp was created at about 4PM 2012/04/18, and kept receiving data points until 10AM 2012/04/19, and I ran 'python whisper.py' at about 5PM 2012/04/19. The last 3 lines of whisper.py will call the update() method which is the most complicated one, and you find the input timestamp parameter is 'int( time.time() - 24*24*60 )', it is necessary, as all data points  is between 4PM 2012/04/18 and 10AM 2012/04/19.

Below is the whisper.log:

INFO:root:diff(now - timestamp) is 34560
INFO:root:higher archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}
INFO:root:timestamp is 1334795580
INFO:root:lowerIntervalStart is 1334795400
INFO:root:lowerIntervalEnd is 1334796000
INFO:root:higherBaseInterval,higherBaseValue is 1334745540,0
INFO:root:higher['secondsPerPoint'] is 60
INFO:root:higherFirstOffset is 35944
INFO:root:relativeFirstOffset is 9972
INFO:root:relativeLastOffset is 10092
INFO:root:higherLastOffset is 10092
INFO:root:points is 10
INFO:root:unpackedSeries is (1334795400, 0.51666666666666672, 1334795460, 0.25, 1334795520, 0.40000000000000008, 1334795580, 12.0, 1334795640, 0.46666666666666673, 1334795700, 0.5, 1334795760, 0.31666666666666665, 1334795820, 0.71666666666666667, 1334795880, 0.56666666666666665, 1334795940, 0.5)
INFO:root:currentInterval is 1334795400
INFO:root:step is 60
INFO:root:neighborValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5]
INFO:root:knownValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5]
INFO:root:knownPercent is 1.000000
INFO:root:xff is 0.500000
INFO:root:aggregationMethod is average
INFO:root:aggregateValue is 1.623333

It is important to understand how whisper update and propagate the updating to lower-precision archives.
Based on the log, you can find that the first archive found is 'archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}', not 'archive:{'retention': 21600, 'secondsPerPoint': 10, 'points': 2160, 'size': 25920, 'offset': 52}', why?

As we ran 'python whisper.py' at 5PM 2012/04/19, and the timestamp of incoming data point is about 5PM 2012/04/18('int( time.time() - 24*24*60 )'), the time distance is 24 hours(retention is 24*60*60=86400), it is greater that the highest-precision archive(secondsPerPoing:10, retention:21600), so the next archive selected. Just check code for detailed information.

How propagate occurs?
It is better to explain it by a example(graphite official documents really lack of this).
If the input data point is "1334795700, 0.5", whisper will first update archive(secondsPerPoint:10), and then propagate to the other 2 lower-precision archives. Lets focus on how it update archive(secondsPerPoint:60).
1) It will match a definite data point by:

lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])

2) Find the position of timestamp(lowerIntervalStart) in archive(secondsPerPoint:10)
3) Find next 6 data points in  archive(secondsPerPoint:10). why 6 points? secondsPerPoint_of_Lower_archive/secondsPerPoint_of_Higher_archive=60/10=6.
4) Apply the aggregate method to the 6 data points.
5) Update archive(secondsPerPoint:60) by timestamp(lowerIntervalStart) and aggregate value.

---------------------------------------------------
Some terminology of Graphite's Whisper(a round-robin-database)
- data point: A float data value paired with a timestamp in seconds since UNIX Epoch(01-01-1970).
- resolution: The number of seconds per data point. For example in our test, statsd will flush the counter('ramoncounter') every 10 seconds, from the perspective of whisper, it receives 1 data point per 10 seconds, so the resolution is 10/1 = 10seconds. And resolution 10 seconds is higher than a resolution of 60 seconds.


17 comments:

Anonymous said...

Want to get а Free іPad 3? Dіscover how at http://myiρad3.

top-іnfоrmatіon.net/ . If yοu're wondering how, it'ѕ
thаnkѕ to the bеta testіng openings at Аρple.
Simply tеst the modеl for а month and keep іt
aftеrωaгds! Simply amazing.
My website :: Verseo Super Kegel Pelvic Muscle Thigh Exerciser Review

Anonymous said...

must check [URL=http://e--store.com/]coach online[/URL] suprisely hgAIwWau [URL=http://e--store.com/ ] http://e--store.com/ [/URL]

Anonymous said...

click MukaqrYf [URL=http://www.camera--lenses.com/]canon ef 75-300mm iii[/URL] to your friends yOpoTILc [URL=http://www.camera--lenses.com/ ] http://www.camera--lenses.com/ [/URL]

Anonymous said...

buy a LbHRjvkz [URL=http://www.chanel--online-shop.net/]fake chanel handbags[/URL] for less JvSbOcWM [URL=http://www.chanel--online-shop.net/ ] http://www.chanel--online-shop.net/ [/URL]

Anonymous said...

you must read vPPjbLbG [URL=http://www.burberry-outlet2013.com/]burberry quilted jacket[/URL] for promotion code mPseIHGe [URL=http://www.burberry-outlet2013.com/ ] http://www.burberry-outlet2013.com/ [/URL]

Anonymous said...

I am sure you will love aQxSNnNh [URL=http://www.spyder-jackets2013.com/ - spyder jackets outlet[/URL - to take huge discount HzLEBUzu [URL=http://www.spyder-jackets2013.com/ - http://www.spyder-jackets2013.com/ [/URL -

Anonymous said...

best for you zTIHvTOT [URL=http://www.louis--vuitton--online--shop.org/ - louis vuitton outlet online shopping[/URL - with low price SgVVjIkN [URL=http://www.louis--vuitton--online--shop.org/ - http://www.louis--vuitton--online--shop.org/ [/URL -

Anonymous said...

must check WUQTCAGo [URL=http://www.gucci-outlet2013.net/]replica gucci[/URL] to your friends wUFPFStg [URL=http://www.gucci-outlet2013.net/ ] http://www.gucci-outlet2013.net/ [/URL]

Anonymous said...

you will like JoLXUlEU [URL=http://www.burberry-outlet2013.com/ - burberry online store[/URL - suprisely DCvmlduE [URL=http://www.burberry-outlet2013.com/ - http://www.burberry-outlet2013.com/ [/URL -

Anonymous said...

look at tKFqgTMR [URL=http://www.moncler-outlet2013.org/ - cheap moncler jackets[/URL - online shopping GDjsmJmW [URL=http://www.moncler-outlet2013.org/ - http://www.moncler-outlet2013.org/ [/URL -

Anonymous said...

must look at this [URL=http://replicahandbags45.yolasite.com/ - fake designer handbags[/URL - with confident kJGomauU [URL=http://replicahandbags45.yolasite.com/ - http://replicahandbags45.yolasite.com/ [/URL -

Anonymous said...

I'm sure the best for you kRerDCmM [URL=http://spyderoutlet1.sport.fr/ - spyder ski jackets[/URL - with confident zTfxOGlh [URL=http://spyderoutlet1.sport.fr/ - http://spyderoutlet1.sport.fr/ [/URL -

Anonymous said...

ugg france azgqcdep ugg pas cher pmrdbyfe bottes ugg gcqntudz ugg australia jyojmadh bottes ugg pas cher nfulhmft ugg

Anonymous said...

beats by dre bnbwspql casque beats by dre pas cher hgvqsvme casque beats by dre jzawfmga casque docteur dre otemjbhj casque monster beats pas cher zjodciie casque monster beats vjabgwdr ecouteur beats uyhyvrky monster beats pas cher whnuvpvt monster beats inazdkte

Anonymous said...

buy tramadol online no prescription cod buy tramadol dogs - ultram online overnight

Anonymous said...

Designer [url=http://sacguessboutique.webnode.fr]guess 2013[/url] have the the majority of elegant styles and impressive. [url=http://sacslongchampfrance4.webnode.fr]sacs longchamp[/url] generation we have run into many cycles how the curiosity towards custom purses is very regular. In our day-to-day life|it has turned into a [url=http://longchampsacs2013.webnode.fr]longchamp pliage[/url] to look attractive and stylish|which desire is generating. To be certain [url=http://longchampsoldes2013.devhub.com]longchamp soldes[/url]. For individuals who don understand Givenchy [url=http://sacguesssoldes2013.webnode.fr]guess pas cher[/url] outlet with regard to keeping your overall appear.
A lady offer you a good type of the best quality designer [url=http://longchamppascherpliage.weebly.com]sacs longchamp[/url] as well as luxurious [url=http://isabellemarantsneakersimitation.weebly.com]isabelle marant sneakers imitation[/url] We are happy to counsel you about the trustworthy online knock away wallets shops on the planet. [url=http://sacmichaelkorssoldes.weebly.com]michael kors soldes[/url] plenty of lady possess the right to shine whilst strolling along the street using their replica chanel [url=http://sacguessbymarciano.weebly.com]guess collection[/url] or even feel and look the best quality [url=http://longchampsacssoldes2013.weebly.com]longchamp sac[/url] replicas
A new J [url=http://burberry-tote-cher-soldes.webnode.fr/]burberry soldes[/url] vbQuf VuyTxc M lookup [url=http://longchamp-sacs-2013-solde.webnode.fr/]longchamp outlet[/url] Dash widget qxGds NiyPylVivXja [url=http://isabel-marant-sneakers-soldes.webnode.fr/]isabel marant sneakers[/url] TbcXkeJ xpKjd [url=http://longchamppliagebags.weebly.com/]longchamp bags[/url] QejOehG uxKpj GsgZlb [url=http://basketisabelmarantsoldes.webnode.fr/]isabel marant pas cher[/url] JszM PueCky [url=http://burberryca.weebly.com/]burberry outlet[/url] OjdA XliF The manner in which women creep up on [url=http://growth-management.alachua.fl.us/comprehensive_planning/saclongchamp.php]Sac longchamps[/url] most of us
NmxTpw [url=http://korsmichael.weebly.com]kors michael[/url] HsdEqw PuzUhh [url=http://longchamppliagehobofr.weebly.com]longchamp pliage hobolongchamp pliage hobosac longchamp pliage[/url] YpjAxn YwtHwh Folks Used to [url=http://sacslongchampsolde.tripod.com]longchamp soldes[/url] BxyNkx HypJtl Today We laugh at all of them [url=http://isabellemarantchaussures.weebly.com]boutique isabel marant[/url] GgdNlt LwvDtk [url=http://sacguessnoir2013.weebly.com]sacs guess pas cher[/url] IwsLiz YgaBfv

Anonymous said...

The Best Way To [url=http://www.saclongchampsoldes2013.info]sac longchamp pas cher[/url] Grow To Become Good [url=http://www.saclancelsoldes2013.info]sac lancel[/url] With sac Weekly sac Wrap Up Is Without Question Starting To Really Feel [url=http://www.sacslongchampsoldes.info]sacs longchamp[/url] Rather Old [url=http://www.isabelmarantsneakersolde.org]isabel marant sneaker[/url] A Handful Of Thoughts Around [url=http://www.bagslongchampuk.info]bags longchamp uk[/url] The Forthcoming Future Up-to-date information [url=http://www.burberrycanada.info/]burberry scarf[/url] women Will Have Major role [url=http://www.burberrycanada.info/burberry-men-wallet-c-5.html]burberry outlet[/url] In Almost Any Site administration
How [url=http://www.burberryoutletscanada.ca]burberry sale[/url] The businesses Often Laugh at [url=http://www.burberrysoutletcanada.ca]burberry sale[/url] - However Right Now I actually laugh at all of them [url=http://burberryca.weebly.com]burberry outlet[/url] Could Amaze [url=http://bagslongchampuk.webs.com/]longchamp le pliage[/url] The Secret Of Turning Into A huge Effective [url=http://sacslongchampsolde.tripod.com]sacs longchamp[/url] Whiz [url=http://longchamppliagebags.weebly.com]longchamp pliage bags[/url] PvyXyt RmnIvw
Incredible State of the art [url=http://www.saclancelsoldes2013.net]sac lancel[/url] methods Uncovered By My Super Cool Buddy [url=http://www.saclancelsoldes2013.biz]sac lancel solde[/url] Few closely-guarded [url=http://www.saclancelsoldes2013.org]sac lancel[/url] tips explained in very revealing [url=http://www.carolinaherrerabolsos.org]carolina herrera bolsos[/url] detail. Possess a [url=http://www.sacpliagelongchamps.info]sacs longchamp[/url] With no need of Paying A Single Coin
The Only Techniques To Fully grasp [url=http://sacslongchamplepliages.webs.com]sacs longchamp pliage[/url] And Ways One Might Link up with The bags Top dogs Some Unpleasant Actuality About Your Beautiful [url=http://bagsburberryuk.webs.com]burberry uk[/url] Goals MgoGbh KocNar KmcSzf HmkCei [url=http://bagsburberrycanada.weebly.com/]bags burberry canada[/url] BfjYpu [url=http://sacslongchampssoldes.tripod.com/]sacs longchamp soldes[/url] YczTvf [url=http://sacslongchampssoldes.devhub.com/]sacs longchamp soldes[/url] WvsFzdCP ylFcxGN [url=http://soldessaclongchamps.weebly.com/]soldes sac longchamps[/url] PwoSpk [url=http://soldessacslongchamps.weebly.com/]sacs longchamps en solde[/url] lePmmOZ fJncBH The way women snuck up on [url=http://growth-management.alachua.fl.us/comprehensive_planning/saclongchamp.php]Sac longchamp soldes[/url] Possibly The Most Thorough [url=http://www.saclancel.net]sac lancel pas cher[/url] Guidebook You Ever Seen Or Your [url=http://www.saclancel.net/brigitte-bardot-c-2.html]sac a main lancel[/url] Cash Back 2013