I have been kept finding a monitor solution a couple time. There are several solutions exist, including
nagios,
ganglia. Ganglia offers a monitoring solution, while Nagios offers a alerting solution too. Both Ganglia and Nagios seem a little complicated, and need a considerable effort to configure. Be honest that I am not familiar with Nagios and Ganglia, and just give a quick look on both of them.
After reading the post
Measure Anything, Measure Everything from Esty, I started to have a try on
statsd and
graphite. Statsd is very simple to setup and easy to integrate with system, but Graphite really took me a very hard time to setup. As at first I tried to set up Graphite on
ubuntu8.04, it failed finally due to python is not compitable with
py2cario.
After re-installed ubuntu10.4 server, and followed the instruction of
how to install graphite on ubuntu, both statsd and Graphite startuped successfully.
The latest version of graphite is 0.9.9, and lacking of document, i am looking forward its 1.0 which planned to release at 2012-01-01 and hugely delayed from its timeline.
Statsd is very simple, it is a daemon of node.js, and its stats.js contains only about 300 lines code. By reveiwing its code, you can fully understand the concept introduced in
Measure Anything, Measure Everything, especially counter and timing.
Now it is time to run a test.
1. Launch Graphite(by default graphite will be installed to /opt/graphite).
> sudo /etc/init.d/apache2 restart # startup apache(graphite-web).
> cd /opt/graphite/> sudo ./bin/carbon-cache.py start # startup carbon to receive incoming stats.
2. Launch Statsd
> cd $STATSD_HOME
> sudo cp exampleConfig.js myconfig.js # change the configuration according to your requirement
> sudo node stats.js myconfig.js
3. Run statsd client(actually there is a
java client)
> java Main
import java.util.Random;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
public class Main{
public static void main(String args[]) throws Exception{
DatagramSocket socket = new DatagramSocket();
while(true) {
int c = new Random().nextInt(10);
String data = "ramoncounter:" + c + "|c";
byte[] d = data.getBytes();
socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125));
System.out.println("[Send] " + data + "...Sleep 10 seconds!");
data = "";
data += "ramontimer:" + new Random().nextInt(30) + "|ms";
d = data.getBytes();
socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125));
System.out.println("[Send] " + data + "...Sleep 10 seconds!");
Thread.sleep(10 * 1000);
}
}
}
Now open web browser and access http://GRAPHITE_HOST, you will be welcomed be graphite web UI.
How graphite maintain the incoming data points?
This is a big topic, graphite includes 3 components:
- carbon - a Twisted daemon that listens for time-series data.
- whisper - a simple database library for storing time-series data (similar in design to RRD)
- graphite webapp - A Django webapp that renders graphs on-demand using Cairo
Whisper is the data storage engine, and it contain one or more archives, each with a specific data resolution and retention (defined in number of points or max timestamp age). Archives are ordered from the highest-resolution and shortest retention archive to the lowest-resolution and longest retention period archive.
As the official document is so limited, and lacks of examples, it is hard to really understand how whisper works. I modified whisper.py by outputting more log messages to understand its mechanism.
The main functions of whisper.py are:
- def create(path,archiveList,xFilesFactor=None,aggregationMethod=None)
- def update(path,value,timestamp=None)
- def fetch(path,fromTime,untilTime=None)
- def info(path) - get header information.
By running the test client(Main.class), I have got a
ramoncounter.wsp file which contains all data points of the given metrics. Its header information is as below:
maxRetention: 157784400
xFilesFactor: 0.5
aggregationMethod: average
fileSize: 3302620
Archive 0
retention: 21600
secondsPerPoint: 10
points: 2160
size: 25920
offset: 52
Archive 1
retention: 604800
secondsPerPoint: 60
points: 10080
size: 120960
offset: 25972
Archive 2
retention: 157784400
secondsPerPoint: 600
points: 262974
size: 3155688
offset: 146932
The amended whisper.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import os, struct, time, logging
29
30 logging.basicConfig(filename="whisper.log",level=logging.DEBUG)
31
32 try:
33 import fcntl
34 CAN_LOCK = True
35 except ImportError:
36 CAN_LOCK = False
37
38 LOCK = False
39 CACHE_HEADERS = False
40 AUTOFLUSH = False
41 __headerCache = {}
42
43 longFormat = "!L"
44 longSize = struct.calcsize(longFormat)
45 floatFormat = "!f"
46 floatSize = struct.calcsize(floatFormat)
47 valueFormat = "!d"
48 valueSize = struct.calcsize(valueFormat)
49 pointFormat = "!Ld"
50 pointSize = struct.calcsize(pointFormat)
51 metadataFormat = "!2LfL"
52 metadataSize = struct.calcsize(metadataFormat)
53 archiveInfoFormat = "!3L"
54 archiveInfoSize = struct.calcsize(archiveInfoFormat)
55
56 aggregationTypeToMethod = dict({
57 1: 'average',
58 2: 'sum',
59 3: 'last',
60 4: 'max',
61 5: 'min'
62 })
63 aggregationMethodToType = dict([[v,k] for k,v in aggregationTypeToMethod.items()])
64 aggregationMethods = aggregationTypeToMethod.values()
65
66 debug = startBlock = endBlock = lambda *a,**k: None
67
68 UnitMultipliers = {
69 's' : 1,
70 'm' : 60,
71 'h' : 60 * 60,
72 'd' : 60 * 60 * 24,
73 'y' : 60 * 60 * 24 * 365,
74 }
75
76
77 def parseRetentionDef(retentionDef):
78 (precision, points) = retentionDef.strip().split(':')
79
80 if precision.isdigit():
81 precisionUnit = 's'
82 precision = int(precision)
83 else:
84 precisionUnit = precision[-1]
85 precision = int( precision[:-1] )
86
87 if points.isdigit():
88 pointsUnit = None
89 points = int(points)
90 else:
91 pointsUnit = points[-1]
92 points = int( points[:-1] )
93
94 if precisionUnit not in UnitMultipliers:
95 raise ValueError("Invalid unit: '%s'" % precisionUnit)
96
97 if pointsUnit not in UnitMultipliers and pointsUnit is not None:
98 raise ValueError("Invalid unit: '%s'" % pointsUnit)
99
100 precision = precision * UnitMultipliers[precisionUnit]
101
102 if pointsUnit:
103 points = points * UnitMultipliers[pointsUnit] / precision
104
105 return (precision, points)
106
107 class WhisperException(Exception):
108 """Base class for whisper exceptions."""
109
110
111 class InvalidConfiguration(WhisperException):
112 """Invalid configuration."""
113
114
115 class InvalidAggregationMethod(WhisperException):
116 """Invalid aggregation method."""
117
118
119 class InvalidTimeInterval(WhisperException):
120 """Invalid time interval."""
121
122
123 class TimestampNotCovered(WhisperException):
124 """Timestamp not covered by any archives in this database."""
125
126 class CorruptWhisperFile(WhisperException):
127 def __init__(self, error, path):
128 Exception.__init__(self, error)
129 self.error = error
130 self.path = path
131
132 def __repr__(self):
133 return "<CorruptWhisperFile[%s] %s>" % (self.path, self.error)
134
135 def __str__(self):
136 return "%s (%s)" % (self.error, self.path)
137
138 def enableDebug():
139 global open, debug, startBlock, endBlock
140 class open(file):
141 def __init__(self,*args,**kwargs):
142 file.__init__(self,*args,**kwargs)
143 self.writeCount = 0
144 self.readCount = 0
145
146 def write(self,data):
147 self.writeCount += 1
148 debug('WRITE %d bytes #%d' % (len(data),self.writeCount))
149 return file.write(self,data)
150
151 def read(self,bytes):
152 self.readCount += 1
153 debug('READ %d bytes #%d' % (bytes,self.readCount))
154 return file.read(self,bytes)
155
156 def debug(message):
157 print 'DEBUG :: %s' % message
158
159 __timingBlocks = {}
160
161 def startBlock(name):
162 __timingBlocks[name] = time.time()
163
164 def endBlock(name):
165 debug("%s took %.5f seconds" % (name,time.time() - __timingBlocks.pop(name)))
166
167
168 def __readHeader(fh):
169 info = __headerCache.get(fh.name)
170 if info:
171 return info
172
173 originalOffset = fh.tell()
174 fh.seek(0)
175 packedMetadata = fh.read(metadataSize)
176
177 try:
178 (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
179 except:
180 raise CorruptWhisperFile("Unable to read header", fh.name)
181
182 archives = []
183
184 for i in xrange(archiveCount):
185 packedArchiveInfo = fh.read(archiveInfoSize)
186 try:
187 (offset,secondsPerPoint,points) = struct.unpack(archiveInfoFormat,packedArchiveInfo)
188 except:
189 raise CorruptWhisperFile("Unable to read archive %d metadata" % i, fh.name)
190
191 archiveInfo = {
192 'offset' : offset,
193 'secondsPerPoint' : secondsPerPoint,
194 'points' : points,
195 'retention' : secondsPerPoint * points,
196 'size' : points * pointSize,
197 }
198 archives.append(archiveInfo)
199
200 fh.seek(originalOffset)
201 info = {
202 'aggregationMethod' : aggregationTypeToMethod.get(aggregationType, 'average'),
203 'maxRetention' : maxRetention,
204 'xFilesFactor' : xff,
205 'archives' : archives,
206 }
207 if CACHE_HEADERS:
208 __headerCache[fh.name] = info
209
210 return info
211
212
213 def setAggregationMethod(path, aggregationMethod):
214 """setAggregationMethod(path,aggregationMethod)
215
216 path is a string
217 aggregationMethod specifies the method to use when propogating data (see ``whisper.aggregationMethods``)
218 """
219 fh = open(path,'r+b')
220 if LOCK:
221 fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
222
223 packedMetadata = fh.read(metadataSize)
224
225 try:
226 (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
227 except:
228 raise CorruptWhisperFile("Unable to read header", fh.name)
229
230 try:
231 newAggregationType = struct.pack( longFormat, aggregationMethodToType[aggregationMethod] )
232 except KeyError:
233 raise InvalidAggregationMethod("Unrecognized aggregation method: %s" %
234 aggregationMethod)
235
236 fh.seek(0)
237 fh.write(newAggregationType)
238
239 if AUTOFLUSH:
240 fh.flush()
241 os.fsync(fh.fileno())
242
243 if CACHE_HEADERS and fh.name in __headerCache:
244 del __headerCache[fh.name]
245
246 fh.close()
247
248 return aggregationTypeToMethod.get(aggregationType, 'average')
249
250
251 def validateArchiveList(archiveList):
252 """ Validates an archiveList. archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
253 An ArchiveList must:
254 1. Have at least one archive config. Example: (60, 86400)
255 2. No archive may be a duplicate of another.
256 3. Higher precision archives' precision must evenly divide all lower precision archives' precision.
257 4. Lower precision archives must cover larger time intervals than higher precision archives.
258
259 Returns True or False
260 """
261
262 try:
263 if not archiveList:
264 raise InvalidConfiguration("You must specify at least one archive configuration!")
265
266 archiveList.sort(key=lambda a: a[0])
267
268 for i,archive in enumerate(archiveList):
269 if i == len(archiveList) - 1:
270 break
271
272 next = archiveList[i+1]
273 if not (archive[0] < next[0]):
274 raise InvalidConfiguration("You cannot configure two archives "
275 "with the same precision %s,%s" % (archive,next))
276
277 if (next[0] % archive[0]) != 0:
278 raise InvalidConfiguration("Higher precision archives' precision "
279 "must evenly divide all lower precision archives' precision %s,%s" \
280 % (archive[0],next[0]))
281
282 retention = archive[0] * archive[1]
283 nextRetention = next[0] * next[1]
284
285 if not (nextRetention > retention):
286 raise InvalidConfiguration("Lower precision archives must cover "
287 "larger time intervals than higher precision archives %s,%s" \
288 % (archive,next))
289
290 except:
291
292 return False
293 return True
294
295 def create(path,archiveList,xFilesFactor=None,aggregationMethod=None):
296 """create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average')
297 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
298 path is a string
299 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
300 xFilesFactor specifies the fraction of data points in a propagation interval that must have known values for a propagation to occur
301 aggregationMethod specifies the function to use when propogating data (see ``whisper.aggregationMethods``)
302 """
303
304 if xFilesFactor is None:
305 xFilesFactor = 0.5
306 if aggregationMethod is None:
307 aggregationMethod = 'average'
308
309
310 validArchive = validateArchiveList(archiveList)
311 if not validArchive:
312 raise InvalidConfiguration("There was a problem creating %s due to an invalid schema config." % path)
313
314
315 if os.path.exists(path):
316 raise InvalidConfiguration("File %s already exists!" % path)
317
318 fh = open(path,'wb')
319 if LOCK:
320 fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
321
322 aggregationType = struct.pack( longFormat, aggregationMethodToType.get(aggregationMethod, 1) )
323
324 oldest = sorted([secondsPerPoint * points for secondsPerPoint,points in archiveList])[-1]
325
326 maxRetention = struct.pack( longFormat, oldest )
327
328 xFilesFactor = struct.pack( floatFormat, float(xFilesFactor) )
329
330 archiveCount = struct.pack(longFormat, len(archiveList))
331 packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
332 fh.write(packedMetadata)
333
334
335
336 headerSize = metadataSize + (archiveInfoSize * len(archiveList))
337 archiveOffsetPointer = headerSize
338
339 for secondsPerPoint,points in archiveList:
340
341 archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
342 fh.write(archiveInfo)
343 archiveOffsetPointer += (points * pointSize)
344
345
346 zeroes = '\x00' * (archiveOffsetPointer - headerSize)
347 fh.write(zeroes)
348
349 if AUTOFLUSH:
350 fh.flush()
351 os.fsync(fh.fileno())
352
353 fh.close()
354
355 def __aggregate(aggregationMethod, knownValues):
356 if aggregationMethod == 'average':
357 return float(sum(knownValues)) / float(len(knownValues))
358 elif aggregationMethod == 'sum':
359 return float(sum(knownValues))
360 elif aggregationMethod == 'last':
361 return knownValues[len(knownValues)-1]
362 elif aggregationMethod == 'max':
363 return max(knownValues)
364 elif aggregationMethod == 'min':
365 return min(knownValues)
366 else:
367 raise InvalidAggregationMethod("Unrecognized aggregation method %s" %
368 aggregationMethod)
369
370 def __propagate(fh,header,timestamp,higher,lower):
371 aggregationMethod = header['aggregationMethod']
372 xff = header['xFilesFactor']
373
374
375
376 lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
377 logging.info("timestamp is %d" % timestamp)
378 logging.info("lowerIntervalStart is %d" % lowerIntervalStart)
379 lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']
380 logging.info("lowerIntervalEnd is %d" % lowerIntervalEnd)
381
382 fh.seek(higher['offset'])
383 packedPoint = fh.read(pointSize)
384 (higherBaseInterval,higherBaseValue) = struct.unpack(pointFormat,packedPoint)
385 logging.info("higherBaseInterval,higherBaseValue is %d,%d" % (higherBaseInterval,higherBaseValue))
386
387 if higherBaseInterval == 0:
388 higherFirstOffset = higher['offset']
389 else:
390 timeDistance = lowerIntervalStart - higherBaseInterval
391 pointDistance = timeDistance / higher['secondsPerPoint']
392 logging.info("higher['secondsPerPoint'] is %d" % higher['secondsPerPoint'])
393 byteDistance = pointDistance * pointSize
394
395 higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])
396 logging.info("higherFirstOffset is %d" % higherFirstOffset)
397
398 higherPoints = lower['secondsPerPoint'] / higher['secondsPerPoint']
399 higherSize = higherPoints * pointSize
400 relativeFirstOffset = higherFirstOffset - higher['offset']
401 logging.info("relativeFirstOffset is %d" % relativeFirstOffset)
402 relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
403 logging.info("relativeLastOffset is %d" % relativeLastOffset)
404 higherLastOffset = relativeLastOffset + higher['offset']
405 logging.info("higherLastOffset is %d" % relativeLastOffset)
406 fh.seek(higherFirstOffset)
407
408 if higherFirstOffset < higherLastOffset:
409 seriesString = fh.read(higherLastOffset - higherFirstOffset)
410 else:
411 higherEnd = higher['offset'] + higher['size']
412 seriesString = fh.read(higherEnd - higherFirstOffset)
413 fh.seek(higher['offset'])
414 seriesString += fh.read(higherLastOffset - higher['offset'])
415
416
417 byteOrder,pointTypes = pointFormat[0],pointFormat[1:]
418 points = len(seriesString) / pointSize
419 logging.info("points is %d" % points)
420 seriesFormat = byteOrder + (pointTypes * points)
421 unpackedSeries = struct.unpack(seriesFormat, seriesString)
422 logging.info("unpackedSeries is %s" % str(unpackedSeries))
423
424
425 neighborValues = [None] * points
426 currentInterval = lowerIntervalStart
427 logging.info("currentInterval is %d" % currentInterval)
428 step = higher['secondsPerPoint']
429 logging.info("step is %d" % step)
430
431
432
433
434 for i in xrange(0,len(unpackedSeries),2):
435 pointTime = unpackedSeries[i]
436 if pointTime == currentInterval:
437
438 neighborValues[i/2] = unpackedSeries[i+1]
439 currentInterval += step
440 logging.info("neighborValues is %s" % str(neighborValues))
441
442
443 knownValues = [v for v in neighborValues if v is not None]
444 if not knownValues:
445 return False
446 logging.info("knownValues is %s" % str(knownValues))
447
448 knownPercent = float(len(knownValues)) / float(len(neighborValues))
449 logging.info("knownPercent is %f" % knownPercent)
450 logging.info("xff is %f" % xff)
451 if knownPercent >= xff:
452 logging.info("aggregationMethod is %s" % str(aggregationMethod))
453 aggregateValue = __aggregate(aggregationMethod, knownValues)
454 logging.info("aggregateValue is %f" % aggregateValue)
455 myPackedPoint = struct.pack(pointFormat,lowerIntervalStart,aggregateValue)
456 fh.seek(lower['offset'])
457 packedPoint = fh.read(pointSize)
458 (lowerBaseInterval,lowerBaseValue) = struct.unpack(pointFormat,packedPoint)
459
460
461 if lowerBaseInterval == 0:
462 fh.seek(lower['offset'])
463 fh.write(myPackedPoint)
464 else:
465 timeDistance = lowerIntervalStart - lowerBaseInterval
466 pointDistance = timeDistance / lower['secondsPerPoint']
467 byteDistance = pointDistance * pointSize
468 lowerOffset = lower['offset'] + (byteDistance % lower['size'])
469 fh.seek(lowerOffset)
470 fh.write(myPackedPoint)
471
472 return True
473
474 else:
475 return False
476
477
478 def update(path,value,timestamp=None):
479 """update(path,value,timestamp=None)
480
481 path is a string
482 value is a float
483 timestamp is either an int or float
484 """
485 value = float(value)
486 fh = open(path,'r+b')
487 return file_update(fh, value, timestamp)
488
489
490 def file_update(fh, value, timestamp):
491 if LOCK:
492 fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
493
494 header = __readHeader(fh)
495 now = int( time.time() )
496 if timestamp is None:
497 timestamp = now
498
499 timestamp = int(timestamp)
500 diff = now - timestamp
501 logging.info("diff(now - timestamp) is %d" % diff)
502 if not ((diff < header['maxRetention']) and diff >= 0):
503 raise TimestampNotCovered("Timestamp not covered by any archives in "
504 "this database.")
505
506
507
508
509 for i,archive in enumerate(header['archives']):
510 if archive['retention'] < diff: continue
511 lowerArchives = header['archives'][i+1:]
512 break
513
514
515
516 myInterval = timestamp - (timestamp % archive['secondsPerPoint'])
517 myPackedPoint = struct.pack(pointFormat,myInterval,value)
518 fh.seek(archive['offset'])
519 packedPoint = fh.read(pointSize)
520 (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint)
521
522 if baseInterval == 0:
523
524 fh.seek(archive['offset'])
525 fh.write(myPackedPoint)
526 baseInterval,baseValue = myInterval,value
527 else:
528 timeDistance = myInterval - baseInterval
529 pointDistance = timeDistance / archive['secondsPerPoint']
530 byteDistance = pointDistance * pointSize
531
532 myOffset = archive['offset'] + (byteDistance % archive['size'])
533 fh.seek(myOffset)
534 fh.write(myPackedPoint)
535
536
537 higher = archive
538 logging.info("higher archive:" + str(higher))
539 for lower in lowerArchives:
540 if not __propagate(fh, header, myInterval, higher, lower):
541 break
542 higher = lower
543
544 if AUTOFLUSH:
545 fh.flush()
546 os.fsync(fh.fileno())
547
548 fh.close()
549
550
551 def update_many(path,points):
552 """update_many(path,points)
553
554 path is a string
555 points is a list of (timestamp,value) points
556 """
557 if not points: return
558 points = [ (int(t),float(v)) for (t,v) in points]
559 points.sort(key=lambda p: p[0],reverse=True)
560 fh = open(path,'r+b')
561 return file_update_many(fh, points)
562
563
564 def file_update_many(fh, points):
565 if LOCK:
566 fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
567
568 header = __readHeader(fh)
569 now = int( time.time() )
570 archives = iter( header['archives'] )
571 currentArchive = archives.next()
572 currentPoints = []
573
574 for point in points:
575 age = now - point[0]
576
577 while currentArchive['retention'] < age:
578 if currentPoints:
579 currentPoints.reverse()
580 __archive_update_many(fh,header,currentArchive,currentPoints)
581 currentPoints = []
582 try:
583 currentArchive = archives.next()
584 except StopIteration:
585 currentArchive = None
586 break
587
588 if not currentArchive:
589 break
590
591 currentPoints.append(point)
592
593 if currentArchive and currentPoints:
594 currentPoints.reverse()
595 __archive_update_many(fh,header,currentArchive,currentPoints)
596
597 if AUTOFLUSH:
598 fh.flush()
599 os.fsync(fh.fileno())
600
601 fh.close()
602
603
604 def __archive_update_many(fh,header,archive,points):
605 step = archive['secondsPerPoint']
606 alignedPoints = [ (timestamp - (timestamp % step), value)
607 for (timestamp,value) in points ]
608
609 packedStrings = []
610 previousInterval = None
611 currentString = ""
612 for (interval,value) in alignedPoints:
613 if (not previousInterval) or (interval == previousInterval + step):
614 currentString += struct.pack(pointFormat,interval,value)
615 previousInterval = interval
616 else:
617 numberOfPoints = len(currentString) / pointSize
618 startInterval = previousInterval - (step * (numberOfPoints-1))
619 packedStrings.append( (startInterval,currentString) )
620 currentString = struct.pack(pointFormat,interval,value)
621 previousInterval = interval
622 if currentString:
623 numberOfPoints = len(currentString) / pointSize
624 startInterval = previousInterval - (step * (numberOfPoints-1))
625 packedStrings.append( (startInterval,currentString) )
626
627
628 fh.seek(archive['offset'])
629 packedBasePoint = fh.read(pointSize)
630 (baseInterval,baseValue) = struct.unpack(pointFormat,packedBasePoint)
631 if baseInterval == 0:
632 baseInterval = packedStrings[0][0]
633
634
635 for (interval,packedString) in packedStrings:
636 timeDistance = interval - baseInterval
637 pointDistance = timeDistance / step
638 byteDistance = pointDistance * pointSize
639 myOffset = archive['offset'] + (byteDistance % archive['size'])
640 fh.seek(myOffset)
641 archiveEnd = archive['offset'] + archive['size']
642 bytesBeyond = (myOffset + len(packedString)) - archiveEnd
643
644 if bytesBeyond > 0:
645 fh.write( packedString[:-bytesBeyond] )
646 assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString))
647 fh.seek( archive['offset'] )
648 fh.write( packedString[-bytesBeyond:] )
649 else:
650 fh.write(packedString)
651
652
653 higher = archive
654 lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]
655
656 for lower in lowerArchives:
657 fit = lambda i: i - (i % lower['secondsPerPoint'])
658 lowerIntervals = [fit(p[0]) for p in alignedPoints]
659 uniqueLowerIntervals = set(lowerIntervals)
660 propagateFurther = False
661 for interval in uniqueLowerIntervals:
662 if __propagate(fh, header, interval, higher, lower):
663 propagateFurther = True
664
665 if not propagateFurther:
666 break
667 higher = lower
668
669
670 def info(path):
671 """info(path)
672
673 path is a string
674 """
675 fh = open(path,'rb')
676 info = __readHeader(fh)
677 fh.close()
678 return info
679
680
681 def fetch(path,fromTime,untilTime=None):
682 """fetch(path,fromTime,untilTime=None)
683
684 path is a string
685 fromTime is an epoch time
686 untilTime is also an epoch time, but defaults to now
687 """
688 fh = open(path,'rb')
689 return file_fetch(fh, fromTime, untilTime)
690
691
692 def file_fetch(fh, fromTime, untilTime):
693 header = __readHeader(fh)
694 now = int( time.time() )
695 if untilTime is None:
696 untilTime = now
697 fromTime = int(fromTime)
698 untilTime = int(untilTime)
699
700 oldestTime = now - header['maxRetention']
701 if fromTime < oldestTime:
702 fromTime = oldestTime
703
704 if not (fromTime < untilTime):
705 raise InvalidTimeInterval("Invalid time interval")
706 if untilTime > now:
707 untilTime = now
708 if untilTime < fromTime:
709 untilTime = now
710
711 diff = now - fromTime
712 for archive in header['archives']:
713 if archive['retention'] >= diff:
714 break
715
716 fromInterval = int( fromTime - (fromTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint']
717 untilInterval = int( untilTime - (untilTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint']
718 fh.seek(archive['offset'])
719 packedPoint = fh.read(pointSize)
720 (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint)
721
722 if baseInterval == 0:
723 step = archive['secondsPerPoint']
724 points = (untilInterval - fromInterval) / step
725 timeInfo = (fromInterval,untilInterval,step)
726 valueList = [None] * points
727 return (timeInfo,valueList)
728
729
730 timeDistance = fromInterval - baseInterval
731 pointDistance = timeDistance / archive['secondsPerPoint']
732 byteDistance = pointDistance * pointSize
733 fromOffset = archive['offset'] + (byteDistance % archive['size'])
734
735
736 timeDistance = untilInterval - baseInterval
737 pointDistance = timeDistance / archive['secondsPerPoint']
738 byteDistance = pointDistance * pointSize
739 untilOffset = archive['offset'] + (byteDistance % archive['size'])
740
741
742 fh.seek(fromOffset)
743 if fromOffset < untilOffset:
744 seriesString = fh.read(untilOffset - fromOffset)
745 else:
746 archiveEnd = archive['offset'] + archive['size']
747 seriesString = fh.read(archiveEnd - fromOffset)
748 fh.seek(archive['offset'])
749 seriesString += fh.read(untilOffset - archive['offset'])
750
751
752 byteOrder,pointTypes = pointFormat[0],pointFormat[1:]
753 points = len(seriesString) / pointSize
754 seriesFormat = byteOrder + (pointTypes * points)
755 unpackedSeries = struct.unpack(seriesFormat, seriesString)
756
757
758 valueList = [None] * points
759 currentInterval = fromInterval
760 step = archive['secondsPerPoint']
761
762 for i in xrange(0,len(unpackedSeries),2):
763 pointTime = unpackedSeries[i]
764 if pointTime == currentInterval:
765 pointValue = unpackedSeries[i+1]
766 valueList[i/2] = pointValue
767 currentInterval += step
768
769 fh.close()
770 timeInfo = (fromInterval,untilInterval,step)
771 return (timeInfo,valueList)
772
773 now = int( time.time() - 24*24*60 )
774 print 'update'
775 update("e:/tmp/ramoncounter-2.wsp", 12, now)
776
777
Be noticed that ramoncounter.wsp was created at about 4PM 2012/04/18, and kept receiving data points until 10AM 2012/04/19, and I ran 'python whisper.py' at about 5PM 2012/04/19. The last 3 lines of whisper.py will call the update() method which is the most complicated one, and you find the input timestamp parameter is '
int( time.time() -
24*
24*
60 )', it is necessary, as all data points is between 4PM 2012/04/18 and 10AM 2012/04/19.
Below is the whisper.log:
INFO:root:diff(now - timestamp) is 34560
INFO:root:higher archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}
INFO:root:timestamp is 1334795580
INFO:root:lowerIntervalStart is 1334795400
INFO:root:lowerIntervalEnd is 1334796000
INFO:root:higherBaseInterval,higherBaseValue is 1334745540,0
INFO:root:higher['secondsPerPoint'] is 60
INFO:root:higherFirstOffset is 35944
INFO:root:relativeFirstOffset is 9972
INFO:root:relativeLastOffset is 10092
INFO:root:higherLastOffset is 10092
INFO:root:points is 10
INFO:root:unpackedSeries is (1334795400, 0.51666666666666672, 1334795460, 0.25, 1334795520, 0.40000000000000008, 1334795580, 12.0, 1334795640, 0.46666666666666673, 1334795700, 0.5, 1334795760, 0.31666666666666665, 1334795820, 0.71666666666666667, 1334795880, 0.56666666666666665, 1334795940, 0.5)
INFO:root:currentInterval is 1334795400
INFO:root:step is 60
INFO:root:neighborValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5]
INFO:root:knownValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5]
INFO:root:knownPercent is 1.000000
INFO:root:xff is 0.500000
INFO:root:aggregationMethod is average
INFO:root:aggregateValue is 1.623333
It is important to understand how whisper update and
propagate the updating to lower-precision archives.
Based on the log, you can find that the first archive found is 'archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}', not 'archive:{'retention': 21600, 'secondsPerPoint': 10, 'points': 2160, 'size': 25920, 'offset': 52}', why?
As we ran 'python whisper.py' at 5PM 2012/04/19, and the timestamp of incoming data point is about 5PM 2012/04/18('
int( time.time() -
24*
24*
60 )'), the time distance is 24 hours(retention is 24*60*60=86400), it is greater that the highest-precision archive(secondsPerPoing:10, retention:21600), so the next archive selected. Just check code for detailed information.
How propagate occurs?
It is better to explain it by a example(graphite official documents really lack of this).
If the input data point is "1334795700, 0.5", whisper will first update archive(secondsPerPoint:10), and then propagate to the other 2 lower-precision archives. Lets focus on how it update archive(secondsPerPoint:60).
1) It will match a definite data point by:
lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
2) Find the position of timestamp(lowerIntervalStart) in archive(secondsPerPoint:10)
3) Find next 6 data points in archive(secondsPerPoint:10). why 6 points? secondsPerPoint_of_Lower_archive/secondsPerPoint_of_Higher_archive=60/10=6.
4) Apply the aggregate method to the 6 data points.
5) Update archive(secondsPerPoint:60) by timestamp(lowerIntervalStart) and aggregate value.
---------------------------------------------------
Some terminology of Graphite's Whisper(a round-robin-database)
- data point: A float data value paired with a timestamp in seconds since UNIX Epoch(01-01-1970).
- resolution: The number of seconds per data point. For example in our test, statsd will flush the counter('ramoncounter') every 10 seconds, from the perspective of whisper, it receives 1 data point per 10 seconds, so the resolution is 10/1 = 10seconds. And resolution 10 seconds is higher than a resolution of 60 seconds.