class GraphiteAPI::Buffer
Constants
- CHARS_TO_BE_IGNORED
- END_OF_STREAM
- VALID_MESSAGE
Public Class Methods
new(options)
click to toggle source
# File lib/graphite-api/buffer.rb, line 30 def initialize options @options = options @queue = Queue.new @streamer = Hash.new {|h,k| h[k] = ""} @cache = Cache::Memory.new options if options[:cache] end
Public Instance Methods
inspect()
click to toggle source
# File lib/graphite-api/buffer.rb, line 89 def inspect "#<GraphiteAPI::Buffer:%s @quque#size=%s @streamer=%s>" % [ object_id, queue.size, streamer] end
new_records?()
click to toggle source
# File lib/graphite-api/buffer.rb, line 85 def new_records? !queue.empty? end
pull(format = nil)
click to toggle source
# File lib/graphite-api/buffer.rb, line 65 def pull format = nil data = nested_zero_hash counter = 0 while new_records? break if ( counter += 1 ) > 1_000_000 # TODO: fix this hash = queue.pop time = normalize_time(hash[:time],options[:slice]) hash[:metric].each { |k,v| data[time][k] += v.to_f } end data.map do |time, hash| hash.map do |key, value| value = cache.incr(time,key,value) if cache results = ["#{prefix}#{key}",("%.2f"%value).to_f, time] format == :string ? results.join(" ") : results end end.flatten(1) end
push(obj)
click to toggle source
Add records to buffer push({:metric => {'a' => 10},:time => Time.now})
# File lib/graphite-api/buffer.rb, line 57 def push obj debug [:buffer,:add, obj] queue.push obj nil end
Also aliased as: <<
stream(message, client_id = nil)
click to toggle source
this method isn't thread safe use push for multiple threads support
# File lib/graphite-api/buffer.rb, line 41 def stream message, client_id = nil message.gsub(/\t/,' ').each_char do |char| next if invalid_char? char streamer[client_id] += char if closed_stream? streamer[client_id] if valid_stream_message? streamer[client_id] push stream_message_to_obj streamer[client_id] end streamer.delete client_id end end end
Private Instance Methods
closed_stream?(string)
click to toggle source
# File lib/graphite-api/buffer.rb, line 105 def closed_stream? string string[-1,1] == END_OF_STREAM end
invalid_char?(char)
click to toggle source
# File lib/graphite-api/buffer.rb, line 101 def invalid_char? char CHARS_TO_BE_IGNORED.include? char end
prefix()
click to toggle source
# File lib/graphite-api/buffer.rb, line 113 def prefix @prefix ||= if options[:prefix] and !options[:prefix].empty? Array(options[:prefix]).join('.') << '.' else "" end end
stream_message_to_obj(message)
click to toggle source
# File lib/graphite-api/buffer.rb, line 96 def stream_message_to_obj message parts = message.split {:metric => { parts[0] => parts[1] },:time => Time.at(parts[2].to_i) } end
valid_stream_message?(message)
click to toggle source
# File lib/graphite-api/buffer.rb, line 109 def valid_stream_message? message message =~ VALID_MESSAGE end