After reading the post Measure Anything, Measure Everything from Esty, I started to have a try on statsd and graphite. Statsd is very simple to setup and easy to integrate with system, but Graphite really took me a very hard time to setup. As at first I tried to set up Graphite on ubuntu8.04, it failed finally due to python is not compitable with py2cario.
After re-installed ubuntu10.4 server, and followed the instruction of how to install graphite on ubuntu, both statsd and Graphite startuped successfully.
The latest version of graphite is 0.9.9, and lacking of document, i am looking forward its 1.0 which planned to release at 2012-01-01 and hugely delayed from its timeline.
Statsd is very simple, it is a daemon of node.js, and its stats.js contains only about 300 lines code. By reveiwing its code, you can fully understand the concept introduced in Measure Anything, Measure Everything, especially counter and timing.
Now it is time to run a test.
1. Launch Graphite(by default graphite will be installed to /opt/graphite).
> sudo /etc/init.d/apache2 restart # startup apache(graphite-web).
> cd /opt/graphite/> sudo ./bin/carbon-cache.py start # startup carbon to receive incoming stats.
2. Launch Statsd
> cd $STATSD_HOME
> sudo cp exampleConfig.js myconfig.js # change the configuration according to your requirement
> sudo node stats.js myconfig.js
3. Run statsd client(actually there is a java client)
> java Main
import java.util.Random; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; public class Main{ public static void main(String args[]) throws Exception{ DatagramSocket socket = new DatagramSocket(); while(true) { // send counter stats int c = new Random().nextInt(10); String data = "ramoncounter:" + c + "|c"; byte[] d = data.getBytes(); socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125)); System.out.println("[Send] " + data + "...Sleep 10 seconds!"); // send timer stats data = ""; data += "ramontimer:" + new Random().nextInt(30) + "|ms"; d = data.getBytes(); socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125)); System.out.println("[Send] " + data + "...Sleep 10 seconds!"); Thread.sleep(10 * 1000); } } }
Now open web browser and access http://GRAPHITE_HOST, you will be welcomed be graphite web UI.
How graphite maintain the incoming data points?
This is a big topic, graphite includes 3 components:
- carbon - a Twisted daemon that listens for time-series data.
- whisper - a simple database library for storing time-series data (similar in design to RRD)
- graphite webapp - A Django webapp that renders graphs on-demand using Cairo
As the official document is so limited, and lacks of examples, it is hard to really understand how whisper works. I modified whisper.py by outputting more log messages to understand its mechanism.
The main functions of whisper.py are:
- def create(path,archiveList,xFilesFactor=None,aggregationMethod=None)
- def update(path,value,timestamp=None)
- def fetch(path,fromTime,untilTime=None)
- def info(path) - get header information.
By running the test client(Main.class), I have got a ramoncounter.wsp file which contains all data points of the given metrics. Its header information is as below:
maxRetention: 157784400 xFilesFactor: 0.5 aggregationMethod: average fileSize: 3302620 Archive 0 retention: 21600 secondsPerPoint: 10 points: 2160 size: 25920 offset: 52 Archive 1 retention: 604800 secondsPerPoint: 60 points: 10080 size: 120960 offset: 25972 Archive 2 retention: 157784400 secondsPerPoint: 600 points: 262974 size: 3155688 offset: 146932
The amended whisper.py:
1 #!/usr/bin/env python 2 # Copyright 2008 Orbitz WorldWide 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 # 17 # This module is an implementation of the Whisper database API 18 # Here is the basic layout of a whisper data file 19 # 20 # File = Header,Data 21 # Header = Metadata,ArchiveInfo+ 22 # Metadata = aggregationType,maxRetention,xFilesFactor,archiveCount 23 # ArchiveInfo = Offset,SecondsPerPoint,Points 24 # Data = Archive+ 25 # Archive = Point+ 26 # Point = timestamp,value 27 28 import os, struct, time, logging 29 30 logging.basicConfig(filename="whisper.log",level=logging.DEBUG) 31 32 try: 33 import fcntl 34 CAN_LOCK = True 35 except ImportError: 36 CAN_LOCK = False 37 38 LOCK = False 39 CACHE_HEADERS = False 40 AUTOFLUSH = False 41 __headerCache = {} 42 43 longFormat = "!L" 44 longSize = struct.calcsize(longFormat) 45 floatFormat = "!f" 46 floatSize = struct.calcsize(floatFormat) 47 valueFormat = "!d" 48 valueSize = struct.calcsize(valueFormat) 49 pointFormat = "!Ld" 50 pointSize = struct.calcsize(pointFormat) 51 metadataFormat = "!2LfL" 52 metadataSize = struct.calcsize(metadataFormat) 53 archiveInfoFormat = "!3L" 54 archiveInfoSize = struct.calcsize(archiveInfoFormat) 55 56 aggregationTypeToMethod = dict({ 57 1: 'average', 58 2: 'sum', 59 3: 'last', 60 4: 'max', 61 5: 'min' 62 }) 63 aggregationMethodToType = dict([[v,k] for k,v in aggregationTypeToMethod.items()]) 64 aggregationMethods = aggregationTypeToMethod.values() 65 66 debug = startBlock = endBlock = lambda *a,**k: None 67 68 UnitMultipliers = { 69 's' : 1, 70 'm' : 60, 71 'h' : 60 * 60, 72 'd' : 60 * 60 * 24, 73 'y' : 60 * 60 * 24 * 365, 74 } 75 76 77 def parseRetentionDef(retentionDef): 78 (precision, points) = retentionDef.strip().split(':') 79 80 if precision.isdigit(): 81 precisionUnit = 's' 82 precision = int(precision) 83 else: 84 precisionUnit = precision[-1] 85 precision = int( precision[:-1] ) 86 87 if points.isdigit(): 88 pointsUnit = None 89 points = int(points) 90 else: 91 pointsUnit = points[-1] 92 points = int( points[:-1] ) 93 94 if precisionUnit not in UnitMultipliers: 95 raise ValueError("Invalid unit: '%s'" % precisionUnit) 96 97 if pointsUnit not in UnitMultipliers and pointsUnit is not None: 98 raise ValueError("Invalid unit: '%s'" % pointsUnit) 99 100 precision = precision * UnitMultipliers[precisionUnit] 101 102 if pointsUnit: 103 points = points * UnitMultipliers[pointsUnit] / precision 104 105 return (precision, points) 106 107 class WhisperException(Exception): 108 """Base class for whisper exceptions.""" 109 110 111 class InvalidConfiguration(WhisperException): 112 """Invalid configuration.""" 113 114 115 class InvalidAggregationMethod(WhisperException): 116 """Invalid aggregation method.""" 117 118 119 class InvalidTimeInterval(WhisperException): 120 """Invalid time interval.""" 121 122 123 class TimestampNotCovered(WhisperException): 124 """Timestamp not covered by any archives in this database.""" 125 126 class CorruptWhisperFile(WhisperException): 127 def __init__(self, error, path): 128 Exception.__init__(self, error) 129 self.error = error 130 self.path = path 131 132 def __repr__(self): 133 return "<CorruptWhisperFile[%s] %s>" % (self.path, self.error) 134 135 def __str__(self): 136 return "%s (%s)" % (self.error, self.path) 137 138 def enableDebug(): 139 global open, debug, startBlock, endBlock 140 class open(file): 141 def __init__(self,*args,**kwargs): 142 file.__init__(self,*args,**kwargs) 143 self.writeCount = 0 144 self.readCount = 0 145 146 def write(self,data): 147 self.writeCount += 1 148 debug('WRITE %d bytes #%d' % (len(data),self.writeCount)) 149 return file.write(self,data) 150 151 def read(self,bytes): 152 self.readCount += 1 153 debug('READ %d bytes #%d' % (bytes,self.readCount)) 154 return file.read(self,bytes) 155 156 def debug(message): 157 print 'DEBUG :: %s' % message 158 159 __timingBlocks = {} 160 161 def startBlock(name): 162 __timingBlocks[name] = time.time() 163 164 def endBlock(name): 165 debug("%s took %.5f seconds" % (name,time.time() - __timingBlocks.pop(name))) 166 167 168 def __readHeader(fh): 169 info = __headerCache.get(fh.name) 170 if info: 171 return info 172 173 originalOffset = fh.tell() 174 fh.seek(0) 175 packedMetadata = fh.read(metadataSize) 176 177 try: 178 (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata) 179 except: 180 raise CorruptWhisperFile("Unable to read header", fh.name) 181 182 archives = [] 183 184 for i in xrange(archiveCount): 185 packedArchiveInfo = fh.read(archiveInfoSize) 186 try: 187 (offset,secondsPerPoint,points) = struct.unpack(archiveInfoFormat,packedArchiveInfo) 188 except: 189 raise CorruptWhisperFile("Unable to read archive %d metadata" % i, fh.name) 190 191 archiveInfo = { 192 'offset' : offset, 193 'secondsPerPoint' : secondsPerPoint, 194 'points' : points, 195 'retention' : secondsPerPoint * points, 196 'size' : points * pointSize, 197 } 198 archives.append(archiveInfo) 199 200 fh.seek(originalOffset) 201 info = { 202 'aggregationMethod' : aggregationTypeToMethod.get(aggregationType, 'average'), 203 'maxRetention' : maxRetention, 204 'xFilesFactor' : xff, 205 'archives' : archives, 206 } 207 if CACHE_HEADERS: 208 __headerCache[fh.name] = info 209 210 return info 211 212 213 def setAggregationMethod(path, aggregationMethod): 214 """setAggregationMethod(path,aggregationMethod) 215 216 path is a string 217 aggregationMethod specifies the method to use when propogating data (see ``whisper.aggregationMethods``) 218 """ 219 fh = open(path,'r+b') 220 if LOCK: 221 fcntl.flock( fh.fileno(), fcntl.LOCK_EX ) 222 223 packedMetadata = fh.read(metadataSize) 224 225 try: 226 (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata) 227 except: 228 raise CorruptWhisperFile("Unable to read header", fh.name) 229 230 try: 231 newAggregationType = struct.pack( longFormat, aggregationMethodToType[aggregationMethod] ) 232 except KeyError: 233 raise InvalidAggregationMethod("Unrecognized aggregation method: %s" % 234 aggregationMethod) 235 236 fh.seek(0) 237 fh.write(newAggregationType) 238 239 if AUTOFLUSH: 240 fh.flush() 241 os.fsync(fh.fileno()) 242 243 if CACHE_HEADERS and fh.name in __headerCache: 244 del __headerCache[fh.name] 245 246 fh.close() 247 248 return aggregationTypeToMethod.get(aggregationType, 'average') 249 250 251 def validateArchiveList(archiveList): 252 """ Validates an archiveList. archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints) 253 An ArchiveList must: 254 1. Have at least one archive config. Example: (60, 86400) 255 2. No archive may be a duplicate of another. 256 3. Higher precision archives' precision must evenly divide all lower precision archives' precision. 257 4. Lower precision archives must cover larger time intervals than higher precision archives. 258 259 Returns True or False 260 """ 261 262 try: 263 if not archiveList: 264 raise InvalidConfiguration("You must specify at least one archive configuration!") 265 266 archiveList.sort(key=lambda a: a[0]) #sort by precision (secondsPerPoint) 267 268 for i,archive in enumerate(archiveList): 269 if i == len(archiveList) - 1: 270 break 271 272 next = archiveList[i+1] 273 if not (archive[0] < next[0]): 274 raise InvalidConfiguration("You cannot configure two archives " 275 "with the same precision %s,%s" % (archive,next)) 276 277 if (next[0] % archive[0]) != 0: 278 raise InvalidConfiguration("Higher precision archives' precision " 279 "must evenly divide all lower precision archives' precision %s,%s" \ 280 % (archive[0],next[0])) 281 282 retention = archive[0] * archive[1] 283 nextRetention = next[0] * next[1] 284 285 if not (nextRetention > retention): 286 raise InvalidConfiguration("Lower precision archives must cover " 287 "larger time intervals than higher precision archives %s,%s" \ 288 % (archive,next)) 289 290 except: 291 # RAMON: no exceptions will be thrown out??? 292 return False 293 return True 294 295 def create(path,archiveList,xFilesFactor=None,aggregationMethod=None): 296 """create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average') 297 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints) 298 path is a string 299 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints) 300 xFilesFactor specifies the fraction of data points in a propagation interval that must have known values for a propagation to occur 301 aggregationMethod specifies the function to use when propogating data (see ``whisper.aggregationMethods``) 302 """ 303 # Set default params 304 if xFilesFactor is None: 305 xFilesFactor = 0.5 306 if aggregationMethod is None: 307 aggregationMethod = 'average' 308 309 #Validate archive configurations... 310 validArchive = validateArchiveList(archiveList) 311 if not validArchive: 312 raise InvalidConfiguration("There was a problem creating %s due to an invalid schema config." % path) 313 314 #Looks good, now we create the file and write the header 315 if os.path.exists(path): 316 raise InvalidConfiguration("File %s already exists!" % path) 317 318 fh = open(path,'wb') 319 if LOCK: 320 fcntl.flock( fh.fileno(), fcntl.LOCK_EX ) 321 322 aggregationType = struct.pack( longFormat, aggregationMethodToType.get(aggregationMethod, 1) ) 323 # for example a archieve list, [[10,2160],[60,10080],[600,262974]] 324 oldest = sorted([secondsPerPoint * points for secondsPerPoint,points in archiveList])[-1] 325 # the retention of [600,262974] is 600*262974 326 maxRetention = struct.pack( longFormat, oldest ) 327 # xFilesFactor = 0.5 328 xFilesFactor = struct.pack( floatFormat, float(xFilesFactor) ) 329 # archiveCount = 3 330 archiveCount = struct.pack(longFormat, len(archiveList)) 331 packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount 332 fh.write(packedMetadata) 333 # !2LfL means L+L+f+L 334 # Metadata !2LfL aggregationType,maxRetention,xFilesFactor,archiveCount 335 # ArchiveInfo !3L Offset,SecondsPerPoint,Points 336 headerSize = metadataSize + (archiveInfoSize * len(archiveList)) 337 archiveOffsetPointer = headerSize 338 339 for secondsPerPoint,points in archiveList: 340 # record the start point(offset) of each archieve. 341 archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points) 342 fh.write(archiveInfo) 343 archiveOffsetPointer += (points * pointSize) 344 345 # perserver the disk space for all archives. 346 zeroes = '\x00' * (archiveOffsetPointer - headerSize) 347 fh.write(zeroes) 348 349 if AUTOFLUSH: 350 fh.flush() 351 os.fsync(fh.fileno()) 352 353 fh.close() 354 355 def __aggregate(aggregationMethod, knownValues): 356 if aggregationMethod == 'average': 357 return float(sum(knownValues)) / float(len(knownValues)) 358 elif aggregationMethod == 'sum': 359 return float(sum(knownValues)) 360 elif aggregationMethod == 'last': 361 return knownValues[len(knownValues)-1] 362 elif aggregationMethod == 'max': 363 return max(knownValues) 364 elif aggregationMethod == 'min': 365 return min(knownValues) 366 else: 367 raise InvalidAggregationMethod("Unrecognized aggregation method %s" % 368 aggregationMethod) 369 370 def __propagate(fh,header,timestamp,higher,lower): 371 aggregationMethod = header['aggregationMethod'] 372 xff = header['xFilesFactor'] 373 374 # guarantee the timestamp can be evenly divided by secondsPerPoint...but 375 # it will lose data precision, right?? 376 lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint']) 377 logging.info("timestamp is %d" % timestamp) 378 logging.info("lowerIntervalStart is %d" % lowerIntervalStart) 379 lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint'] 380 logging.info("lowerIntervalEnd is %d" % lowerIntervalEnd) 381 382 fh.seek(higher['offset']) 383 packedPoint = fh.read(pointSize) 384 (higherBaseInterval,higherBaseValue) = struct.unpack(pointFormat,packedPoint) 385 logging.info("higherBaseInterval,higherBaseValue is %d,%d" % (higherBaseInterval,higherBaseValue)) 386 387 if higherBaseInterval == 0: 388 higherFirstOffset = higher['offset'] 389 else: 390 timeDistance = lowerIntervalStart - higherBaseInterval 391 pointDistance = timeDistance / higher['secondsPerPoint'] 392 logging.info("higher['secondsPerPoint'] is %d" % higher['secondsPerPoint']) 393 byteDistance = pointDistance * pointSize 394 # higher[:higherFirstOffset] the first data point till the data point which timestamp matches the given timestamp 395 higherFirstOffset = higher['offset'] + (byteDistance % higher['size']) 396 logging.info("higherFirstOffset is %d" % higherFirstOffset) 397 398 higherPoints = lower['secondsPerPoint'] / higher['secondsPerPoint'] 399 higherSize = higherPoints * pointSize 400 relativeFirstOffset = higherFirstOffset - higher['offset'] 401 logging.info("relativeFirstOffset is %d" % relativeFirstOffset) 402 relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size'] 403 logging.info("relativeLastOffset is %d" % relativeLastOffset) 404 higherLastOffset = relativeLastOffset + higher['offset'] 405 logging.info("higherLastOffset is %d" % relativeLastOffset) 406 fh.seek(higherFirstOffset) 407 408 if higherFirstOffset < higherLastOffset: #we don't wrap the archive 409 seriesString = fh.read(higherLastOffset - higherFirstOffset) 410 else: #We do wrap the archive..round robin 411 higherEnd = higher['offset'] + higher['size'] 412 seriesString = fh.read(higherEnd - higherFirstOffset) 413 fh.seek(higher['offset']) 414 seriesString += fh.read(higherLastOffset - higher['offset']) 415 416 #Now we unpack the series data we just read 417 byteOrder,pointTypes = pointFormat[0],pointFormat[1:] 418 points = len(seriesString) / pointSize 419 logging.info("points is %d" % points) 420 seriesFormat = byteOrder + (pointTypes * points) 421 unpackedSeries = struct.unpack(seriesFormat, seriesString) 422 logging.info("unpackedSeries is %s" % str(unpackedSeries)) 423 424 #And finally we construct a list of values 425 neighborValues = [None] * points 426 currentInterval = lowerIntervalStart 427 logging.info("currentInterval is %d" % currentInterval) 428 step = higher['secondsPerPoint'] 429 logging.info("step is %d" % step) 430 431 # the value of unpackedSeries will like below: 432 # (1334806980, 0.29999999999999999, 1334806990, 0.20000000000000001, 1334828600, 12.0, 1334807010, 0.29999999999999999, 1334807020, 0.5) 433 # and the xrange(0,len(unpackedSeries),2) will return [0,2,4,6...] 434 for i in xrange(0,len(unpackedSeries),2): 435 pointTime = unpackedSeries[i] 436 if pointTime == currentInterval: 437 # what does this mean???...check above comments. 438 neighborValues[i/2] = unpackedSeries[i+1] 439 currentInterval += step 440 logging.info("neighborValues is %s" % str(neighborValues)) 441 442 #Propagate aggregateValue to propagate from neighborValues if we have enough known points 443 knownValues = [v for v in neighborValues if v is not None] 444 if not knownValues: 445 return False 446 logging.info("knownValues is %s" % str(knownValues)) 447 448 knownPercent = float(len(knownValues)) / float(len(neighborValues)) 449 logging.info("knownPercent is %f" % knownPercent) 450 logging.info("xff is %f" % xff) 451 if knownPercent >= xff: #we have enough data to propagate a value! 452 logging.info("aggregationMethod is %s" % str(aggregationMethod)) 453 aggregateValue = __aggregate(aggregationMethod, knownValues) 454 logging.info("aggregateValue is %f" % aggregateValue) 455 myPackedPoint = struct.pack(pointFormat,lowerIntervalStart,aggregateValue) 456 fh.seek(lower['offset']) 457 packedPoint = fh.read(pointSize) 458 (lowerBaseInterval,lowerBaseValue) = struct.unpack(pointFormat,packedPoint) 459 460 # create or update 461 if lowerBaseInterval == 0: #First propagated update to this lower archive 462 fh.seek(lower['offset']) 463 fh.write(myPackedPoint) 464 else: #Not our first propagated update to this lower archive 465 timeDistance = lowerIntervalStart - lowerBaseInterval 466 pointDistance = timeDistance / lower['secondsPerPoint'] 467 byteDistance = pointDistance * pointSize 468 lowerOffset = lower['offset'] + (byteDistance % lower['size']) 469 fh.seek(lowerOffset) 470 fh.write(myPackedPoint) 471 472 return True 473 474 else: 475 return False 476 477 478 def update(path,value,timestamp=None): 479 """update(path,value,timestamp=None) 480 481 path is a string 482 value is a float 483 timestamp is either an int or float 484 """ 485 value = float(value) 486 fh = open(path,'r+b') 487 return file_update(fh, value, timestamp) 488 489 490 def file_update(fh, value, timestamp): 491 if LOCK: 492 fcntl.flock( fh.fileno(), fcntl.LOCK_EX ) 493 494 header = __readHeader(fh) 495 now = int( time.time() ) 496 if timestamp is None: 497 timestamp = now 498 499 timestamp = int(timestamp) 500 diff = now - timestamp 501 logging.info("diff(now - timestamp) is %d" % diff) 502 if not ((diff < header['maxRetention']) and diff >= 0): 503 raise TimestampNotCovered("Timestamp not covered by any archives in " 504 "this database.") 505 506 # for [[10,2160],[60,10080],[600,262974]], [10,2160] is the highest-precision archive, while 507 # [600,262974] is the lowest-precision archive...the archive list is sorted from highest-precision 508 # to lowest-precision. 509 for i,archive in enumerate(header['archives']): #Find the highest-precision archive that covers timestamp 510 if archive['retention'] < diff: continue 511 lowerArchives = header['archives'][i+1:] #We'll pass on the update to these lower precision archives later 512 break 513 514 # The scope of variable 'archive' in for loop is beyond the for loop...a little strange feature! 515 # First we update the highest-precision archive 516 myInterval = timestamp - (timestamp % archive['secondsPerPoint']) 517 myPackedPoint = struct.pack(pointFormat,myInterval,value) 518 fh.seek(archive['offset']) 519 packedPoint = fh.read(pointSize) 520 (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint) 521 522 if baseInterval == 0: #This file's first update 523 # seek(offset) will reach the absolute position specified by offset. 524 fh.seek(archive['offset']) 525 fh.write(myPackedPoint) 526 baseInterval,baseValue = myInterval,value 527 else: #Not our first update 528 timeDistance = myInterval - baseInterval 529 pointDistance = timeDistance / archive['secondsPerPoint'] 530 byteDistance = pointDistance * pointSize 531 # byteDistance % archive['size'] round-robin 532 myOffset = archive['offset'] + (byteDistance % archive['size']) 533 fh.seek(myOffset) 534 fh.write(myPackedPoint) 535 536 #Now we propagate the update to lower-precision archives 537 higher = archive 538 logging.info("higher archive:" + str(higher)) 539 for lower in lowerArchives: 540 if not __propagate(fh, header, myInterval, higher, lower): 541 break 542 higher = lower 543 544 if AUTOFLUSH: 545 fh.flush() 546 os.fsync(fh.fileno()) 547 548 fh.close() 549 550 551 def update_many(path,points): 552 """update_many(path,points) 553 554 path is a string 555 points is a list of (timestamp,value) points 556 """ 557 if not points: return 558 points = [ (int(t),float(v)) for (t,v) in points] 559 points.sort(key=lambda p: p[0],reverse=True) #order points by timestamp, newest first 560 fh = open(path,'r+b') 561 return file_update_many(fh, points) 562 563 564 def file_update_many(fh, points): 565 if LOCK: 566 fcntl.flock( fh.fileno(), fcntl.LOCK_EX ) 567 568 header = __readHeader(fh) 569 now = int( time.time() ) 570 archives = iter( header['archives'] ) 571 currentArchive = archives.next() 572 currentPoints = [] 573 574 for point in points: 575 age = now - point[0] 576 577 while currentArchive['retention'] < age: #we can't fit any more points in this archive 578 if currentPoints: #commit all the points we've found that it can fit 579 currentPoints.reverse() #put points in chronological order 580 __archive_update_many(fh,header,currentArchive,currentPoints) 581 currentPoints = [] 582 try: 583 currentArchive = archives.next() 584 except StopIteration: 585 currentArchive = None 586 break 587 588 if not currentArchive: 589 break #drop remaining points that don't fit in the database 590 591 currentPoints.append(point) 592 593 if currentArchive and currentPoints: #don't forget to commit after we've checked all the archives 594 currentPoints.reverse() 595 __archive_update_many(fh,header,currentArchive,currentPoints) 596 597 if AUTOFLUSH: 598 fh.flush() 599 os.fsync(fh.fileno()) 600 601 fh.close() 602 603 604 def __archive_update_many(fh,header,archive,points): 605 step = archive['secondsPerPoint'] 606 alignedPoints = [ (timestamp - (timestamp % step), value) 607 for (timestamp,value) in points ] 608 #Create a packed string for each contiguous sequence of points 609 packedStrings = [] 610 previousInterval = None 611 currentString = "" 612 for (interval,value) in alignedPoints: 613 if (not previousInterval) or (interval == previousInterval + step): 614 currentString += struct.pack(pointFormat,interval,value) 615 previousInterval = interval 616 else: 617 numberOfPoints = len(currentString) / pointSize 618 startInterval = previousInterval - (step * (numberOfPoints-1)) 619 packedStrings.append( (startInterval,currentString) ) 620 currentString = struct.pack(pointFormat,interval,value) 621 previousInterval = interval 622 if currentString: 623 numberOfPoints = len(currentString) / pointSize 624 startInterval = previousInterval - (step * (numberOfPoints-1)) 625 packedStrings.append( (startInterval,currentString) ) 626 627 #Read base point and determine where our writes will start 628 fh.seek(archive['offset']) 629 packedBasePoint = fh.read(pointSize) 630 (baseInterval,baseValue) = struct.unpack(pointFormat,packedBasePoint) 631 if baseInterval == 0: #This file's first update 632 baseInterval = packedStrings[0][0] #use our first string as the base, so we start at the start 633 634 #Write all of our packed strings in locations determined by the baseInterval 635 for (interval,packedString) in packedStrings: 636 timeDistance = interval - baseInterval 637 pointDistance = timeDistance / step 638 byteDistance = pointDistance * pointSize 639 myOffset = archive['offset'] + (byteDistance % archive['size']) 640 fh.seek(myOffset) 641 archiveEnd = archive['offset'] + archive['size'] 642 bytesBeyond = (myOffset + len(packedString)) - archiveEnd 643 644 if bytesBeyond > 0: 645 fh.write( packedString[:-bytesBeyond] ) 646 assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString)) 647 fh.seek( archive['offset'] ) 648 fh.write( packedString[-bytesBeyond:] ) #safe because it can't exceed the archive (retention checking logic above) 649 else: 650 fh.write(packedString) 651 652 #Now we propagate the updates to lower-precision archives 653 higher = archive 654 lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']] 655 656 for lower in lowerArchives: 657 fit = lambda i: i - (i % lower['secondsPerPoint']) 658 lowerIntervals = [fit(p[0]) for p in alignedPoints] 659 uniqueLowerIntervals = set(lowerIntervals) 660 propagateFurther = False 661 for interval in uniqueLowerIntervals: 662 if __propagate(fh, header, interval, higher, lower): 663 propagateFurther = True 664 665 if not propagateFurther: 666 break 667 higher = lower 668 669 670 def info(path): 671 """info(path) 672 673 path is a string 674 """ 675 fh = open(path,'rb') 676 info = __readHeader(fh) 677 fh.close() 678 return info 679 680 681 def fetch(path,fromTime,untilTime=None): 682 """fetch(path,fromTime,untilTime=None) 683 684 path is a string 685 fromTime is an epoch time 686 untilTime is also an epoch time, but defaults to now 687 """ 688 fh = open(path,'rb') 689 return file_fetch(fh, fromTime, untilTime) 690 691 692 def file_fetch(fh, fromTime, untilTime): 693 header = __readHeader(fh) 694 now = int( time.time() ) 695 if untilTime is None: 696 untilTime = now 697 fromTime = int(fromTime) 698 untilTime = int(untilTime) 699 700 oldestTime = now - header['maxRetention'] 701 if fromTime < oldestTime: 702 fromTime = oldestTime 703 704 if not (fromTime < untilTime): 705 raise InvalidTimeInterval("Invalid time interval") 706 if untilTime > now: 707 untilTime = now 708 if untilTime < fromTime: 709 untilTime = now 710 711 diff = now - fromTime 712 for archive in header['archives']: 713 if archive['retention'] >= diff: 714 break 715 716 fromInterval = int( fromTime - (fromTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint'] 717 untilInterval = int( untilTime - (untilTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint'] 718 fh.seek(archive['offset']) 719 packedPoint = fh.read(pointSize) 720 (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint) 721 722 if baseInterval == 0: 723 step = archive['secondsPerPoint'] 724 points = (untilInterval - fromInterval) / step 725 timeInfo = (fromInterval,untilInterval,step) 726 valueList = [None] * points 727 return (timeInfo,valueList) 728 729 #Determine fromOffset 730 timeDistance = fromInterval - baseInterval 731 pointDistance = timeDistance / archive['secondsPerPoint'] 732 byteDistance = pointDistance * pointSize 733 fromOffset = archive['offset'] + (byteDistance % archive['size']) 734 735 #Determine untilOffset 736 timeDistance = untilInterval - baseInterval 737 pointDistance = timeDistance / archive['secondsPerPoint'] 738 byteDistance = pointDistance * pointSize 739 untilOffset = archive['offset'] + (byteDistance % archive['size']) 740 741 #Read all the points in the interval 742 fh.seek(fromOffset) 743 if fromOffset < untilOffset: #If we don't wrap around the archive 744 seriesString = fh.read(untilOffset - fromOffset) 745 else: #We do wrap around the archive, so we need two reads 746 archiveEnd = archive['offset'] + archive['size'] 747 seriesString = fh.read(archiveEnd - fromOffset) 748 fh.seek(archive['offset']) 749 seriesString += fh.read(untilOffset - archive['offset']) 750 751 #Now we unpack the series data we just read (anything faster than unpack?) 752 byteOrder,pointTypes = pointFormat[0],pointFormat[1:] 753 points = len(seriesString) / pointSize 754 seriesFormat = byteOrder + (pointTypes * points) 755 unpackedSeries = struct.unpack(seriesFormat, seriesString) 756 757 #And finally we construct a list of values (optimize this!) 758 valueList = [None] * points #pre-allocate entire list for speed 759 currentInterval = fromInterval 760 step = archive['secondsPerPoint'] 761 762 for i in xrange(0,len(unpackedSeries),2): 763 pointTime = unpackedSeries[i] 764 if pointTime == currentInterval: 765 pointValue = unpackedSeries[i+1] 766 valueList[i/2] = pointValue #in-place reassignment is faster than append() 767 currentInterval += step 768 769 fh.close() 770 timeInfo = (fromInterval,untilInterval,step) 771 return (timeInfo,valueList) 772 773 now = int( time.time() - 24*24*60 ) 774 print 'update' 775 update("e:/tmp/ramoncounter-2.wsp", 12, now) 776 777
Be noticed that ramoncounter.wsp was created at about 4PM 2012/04/18, and kept receiving data points until 10AM 2012/04/19, and I ran 'python whisper.py' at about 5PM 2012/04/19. The last 3 lines of whisper.py will call the update() method which is the most complicated one, and you find the input timestamp parameter is 'int( time.time() - 24*24*60 )', it is necessary, as all data points is between 4PM 2012/04/18 and 10AM 2012/04/19.
Below is the whisper.log:
INFO:root:diff(now - timestamp) is 34560 INFO:root:higher archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972} INFO:root:timestamp is 1334795580 INFO:root:lowerIntervalStart is 1334795400 INFO:root:lowerIntervalEnd is 1334796000 INFO:root:higherBaseInterval,higherBaseValue is 1334745540,0 INFO:root:higher['secondsPerPoint'] is 60 INFO:root:higherFirstOffset is 35944 INFO:root:relativeFirstOffset is 9972 INFO:root:relativeLastOffset is 10092 INFO:root:higherLastOffset is 10092 INFO:root:points is 10 INFO:root:unpackedSeries is (1334795400, 0.51666666666666672, 1334795460, 0.25, 1334795520, 0.40000000000000008, 1334795580, 12.0, 1334795640, 0.46666666666666673, 1334795700, 0.5, 1334795760, 0.31666666666666665, 1334795820, 0.71666666666666667, 1334795880, 0.56666666666666665, 1334795940, 0.5) INFO:root:currentInterval is 1334795400 INFO:root:step is 60 INFO:root:neighborValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5] INFO:root:knownValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5] INFO:root:knownPercent is 1.000000 INFO:root:xff is 0.500000 INFO:root:aggregationMethod is average INFO:root:aggregateValue is 1.623333It is important to understand how whisper update and propagate the updating to lower-precision archives.
Based on the log, you can find that the first archive found is 'archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}', not 'archive:{'retention': 21600, 'secondsPerPoint': 10, 'points': 2160, 'size': 25920, 'offset': 52}', why?
As we ran 'python whisper.py' at 5PM 2012/04/19, and the timestamp of incoming data point is about 5PM 2012/04/18('int( time.time() - 24*24*60 )'), the time distance is 24 hours(retention is 24*60*60=86400), it is greater that the highest-precision archive(secondsPerPoing:10, retention:21600), so the next archive selected. Just check code for detailed information.
How propagate occurs?
It is better to explain it by a example(graphite official documents really lack of this).
If the input data point is "1334795700, 0.5", whisper will first update archive(secondsPerPoint:10), and then propagate to the other 2 lower-precision archives. Lets focus on how it update archive(secondsPerPoint:60).
1) It will match a definite data point by:
lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
2) Find the position of timestamp(lowerIntervalStart) in archive(secondsPerPoint:10)
3) Find next 6 data points in archive(secondsPerPoint:10). why 6 points? secondsPerPoint_of_Lower_archive/secondsPerPoint_of_Higher_archive=60/10=6.
4) Apply the aggregate method to the 6 data points.
5) Update archive(secondsPerPoint:60) by timestamp(lowerIntervalStart) and aggregate value.
---------------------------------------------------
Some terminology of Graphite's Whisper(a round-robin-database)
- data point: A float data value paired with a timestamp in seconds since UNIX Epoch(01-01-1970).
- resolution: The number of seconds per data point. For example in our test, statsd will flush the counter('ramoncounter') every 10 seconds, from the perspective of whisper, it receives 1 data point per 10 seconds, so the resolution is 10/1 = 10seconds. And resolution 10 seconds is higher than a resolution of 60 seconds.