Class: InfluxDB2::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/influxdb2/client/worker.rb

Overview

Worker for handling write batching queue

Instance Method Summary collapse

Constructor Details

#initialize(api_client, write_options) ⇒ Worker

Returns a new instance of Worker.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/influxdb2/client/worker.rb', line 25

def initialize(api_client, write_options)
  @api_client = api_client
  @write_options = write_options

  @queue = Queue.new
  @queue_event = Queue.new

  @queue_event.push(true)

  @thread_flush = Thread.new do
    until api_client.closed
      sleep @write_options.flush_interval.to_f / 1_000
      _check_background_queue
    end
  end
  @thread_flush.abort_on_exception = @write_options.batch_abort_on_exception

  @thread_size = Thread.new do
    until api_client.closed
      _check_background_queue(size: true) if @queue.length >= @write_options.batch_size
      sleep 0.01
    end
  end
  @thread_size.abort_on_exception = @write_options.batch_abort_on_exception
end

Instance Method Details

#flush_allObject



61
62
63
# File 'lib/influxdb2/client/worker.rb', line 61

def flush_all
  _check_background_queue until @queue.empty?
end

#push(payload) ⇒ Object



51
52
53
54
55
56
57
58
59
# File 'lib/influxdb2/client/worker.rb', line 51

def push(payload)
  if payload.respond_to? :each
    payload.each do |item|
      push(item)
    end
  else
    @queue.push(payload)
  end
end