fluent-plugin-kafka 0.19.0 → 0.19.2

This diff represents the content of publicly available package versions that have been released to one of the supported registries. The information contained in this diff is provided for informational purposes only and reflects changes between package versions as they appear in their respective public registries.
checksums.yaml CHANGED
@@ -1,7 +1,7 @@
1
1
  ---
2
2
  SHA256:
3
- metadata.gz: b4a8c37b041fedc3f95046620413e5f7da437557fc19294439585fa89fd5b244
4
- data.tar.gz: fe4c8cc8df6b8b5b105fbf709044e10752d8b64f321b5a4e290cb7e316e10fe1
3
+ metadata.gz: 62d114017cabc47f58cd39db1ea21dd8cfb6d76126c4804741e25766a5b4576c
4
+ data.tar.gz: f5e4ea9adb96a8f2f46077c0ac89b51e1c177ea0933242ed6cbb6472e40577a2
5
5
  SHA512:
6
- metadata.gz: 2ff333ee092e0ffd653ab476acf8b2656b4ca59ea32d6dcc846eb8a174f69d98811272e8a735bfa3bdfac3d5b3753ce499adb5c8a215b3197310b1f4d822364e
7
- data.tar.gz: ebf6cafbde9635cfc886ee4dce84495353b2d389e447e6188f6a88eb4ca11b19086ce5a12565bac550ef26936f424e4ec5a0fe908fad4b39bea984758ac44720
6
+ metadata.gz: 80f4d2ce4d23d95837d3b344841339fc517581bc3347e17e5352a135b088756317b970b559eb66f05e0216ce8841c9e374c5ed0fd09e48d22346249ec9aec0b9
7
+ data.tar.gz: 8d89ff55bf6ac92e5d2c7a3942882a6b307e6fe19ede620bf299d54feb6f2f1d70fcdece731008324a993c96c69f44456a71798c9af879666de9d017b38e39c3
@@ -20,7 +20,7 @@ jobs:
20
20
  - { min: '>= 0.12.0', max: '>= 0.12.0' }
21
21
  name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }} with rdkafka gem version (min ${{ matrix.rdkafka_versions.min }} max ${{ matrix.rdkafka_versions.max }})
22
22
  steps:
23
- - uses: actions/checkout@v3
23
+ - uses: actions/checkout@v4
24
24
  - uses: ruby/setup-ruby@v1
25
25
  with:
26
26
  ruby-version: ${{ matrix.ruby }}
data/ChangeLog CHANGED
@@ -1,3 +1,9 @@
1
+ Release 0.19.2 - 2023/10/13
2
+ * out_rdkafka2: Add `discard_kafka_delivery_failed_regex`
3
+
4
+ Release 0.19.1 - 2023/09/20
5
+ * out_rdkafka2: Add `use_default_for_unknown_topic` & `use_default_for_unknown_partition_error`
6
+
1
7
  Release 0.19.0 - 2023/04/26
2
8
  * out_kafka2: Add support for AWS IAM authentication
3
9
  * in_kafka, in_kafka_group, out_kafka2: Add support for ssl client cert key password
data/README.md CHANGED
@@ -510,11 +510,14 @@ You need to install rdkafka gem.
510
510
  partition_key_key (string) :default => 'partition_key'
511
511
  message_key_key (string) :default => 'message_key'
512
512
  default_topic (string) :default => nil
513
+ use_default_for_unknown_topic (bool) :default => false
514
+ use_default_for_unknown_partition_error (bool) :default => false
513
515
  default_partition_key (string) :default => nil
514
516
  default_message_key (string) :default => nil
515
517
  exclude_topic_key (bool) :default => false
516
518
  exclude_partition_key (bool) :default => false
517
519
  discard_kafka_delivery_failed (bool) :default => false (No discard)
520
+ discard_kafka_delivery_failed_regex (regexp) :default => nil (No discard)
518
521
  use_event_time (bool) :default => false
519
522
 
520
523
  # same with kafka2
@@ -557,6 +560,9 @@ You need to install rdkafka gem.
557
560
  max_enqueue_bytes_per_second (integer) :default => nil
558
561
  </match>
559
562
 
563
+ `rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter:
564
+ - `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`.
565
+
560
566
  If you use v0.12, use `rdkafka` instead.
561
567
 
562
568
  <match kafka.**>
@@ -13,7 +13,7 @@ Gem::Specification.new do |gem|
13
13
  gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
14
14
  gem.name = "fluent-plugin-kafka"
15
15
  gem.require_paths = ["lib"]
16
- gem.version = '0.19.0'
16
+ gem.version = '0.19.2'
17
17
  gem.required_ruby_version = ">= 2.1.0"
18
18
 
19
19
  gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
@@ -65,6 +65,8 @@ DESC
65
65
  config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"
66
66
  config_param :default_topic, :string, :default => nil,
67
67
  :desc => "Default output topic when record doesn't have topic field"
68
+ config_param :use_default_for_unknown_topic, :bool, :default => false, :desc => "If true, default_topic is used when topic not found"
69
+ config_param :use_default_for_unknown_partition_error, :bool, :default => false, :desc => "If true, default_topic is used when received unknown_partition error"
68
70
  config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key"
69
71
  config_param :default_message_key, :string, :default => nil
70
72
  config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
@@ -110,6 +112,7 @@ DESC
110
112
  config_param :use_event_time, :bool, :default => false, :desc => 'Use fluentd event time for rdkafka timestamp'
111
113
  config_param :max_send_limit_bytes, :size, :default => nil
112
114
  config_param :discard_kafka_delivery_failed, :bool, :default => false
115
+ config_param :discard_kafka_delivery_failed_regex, :regexp, :default => nil
113
116
  config_param :rdkafka_buffering_max_ms, :integer, :default => nil, :desc => 'Used for queue.buffering.max.ms'
114
117
  config_param :rdkafka_buffering_max_messages, :integer, :default => nil, :desc => 'Used for queue.buffering.max.messages'
115
118
  config_param :rdkafka_message_max_bytes, :integer, :default => nil, :desc => 'Used for message.max.bytes'
@@ -233,6 +236,9 @@ DESC
233
236
  @rdkafka = Rdkafka::Config.new(config)
234
237
 
235
238
  if @default_topic.nil?
239
+ if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error
240
+ raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true"
241
+ end
236
242
  if @chunk_keys.include?(@topic_key) && !@chunk_key_tag
237
243
  log.warn "Use '#{@topic_key}' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer #{@topic_key},tag>"
238
244
  end
@@ -456,9 +462,13 @@ DESC
456
462
  if @discard_kafka_delivery_failed
457
463
  log.warn "Delivery failed. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
458
464
  else
459
- log.warn "Send exception occurred: #{e} at #{e.backtrace.first}"
460
- # Raise exception to retry sendind messages
461
- raise e
465
+ if @discard_kafka_delivery_failed_regex != nil && @discard_kafka_delivery_failed_regex.match?(e.to_s)
466
+ log.warn "Delivery failed and matched regexp pattern #{@discard_kafka_delivery_failed_regex}. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
467
+ else
468
+ log.warn "Send exception occurred: #{e} at #{e.backtrace.first}"
469
+ # Raise exception to retry sendind messages
470
+ raise e
471
+ end
462
472
  end
463
473
  ensure
464
474
  @writing_threads_mutex.synchronize { @writing_threads.delete(Thread.current) }
@@ -466,17 +476,25 @@ DESC
466
476
 
467
477
  def enqueue_with_retry(producer, topic, record_buf, message_key, partition, headers, time)
468
478
  attempt = 0
479
+ actual_topic = topic
480
+
469
481
  loop do
470
482
  begin
471
483
  @enqueue_rate.raise_if_limit_exceeded(record_buf.bytesize) if @enqueue_rate
472
- return producer.produce(topic: topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
484
+ return producer.produce(topic: actual_topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
473
485
  rescue EnqueueRate::LimitExceeded => e
474
486
  @enqueue_rate.revert if @enqueue_rate
475
487
  duration = e.next_retry_clock - Fluent::Clock.now
476
488
  sleep(duration) if duration > 0.0
477
489
  rescue Exception => e
478
490
  @enqueue_rate.revert if @enqueue_rate
479
- if e.respond_to?(:code) && e.code == :queue_full
491
+
492
+ if !e.respond_to?(:code)
493
+ raise e
494
+ end
495
+
496
+ case e.code
497
+ when :queue_full
480
498
  if attempt <= @max_enqueue_retries
481
499
  log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s"
482
500
  sleep @enqueue_retry_backoff
@@ -484,6 +502,25 @@ DESC
484
502
  else
485
503
  raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times"
486
504
  end
505
+ # https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#LL309C9-L309C41
506
+ # RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC
507
+ when :unknown_topic
508
+ if @use_default_for_unknown_topic && actual_topic != @default_topic
509
+ log.debug "'#{actual_topic}' topic not found. Retry with '#{@default_topic}' topic"
510
+ actual_topic = @default_topic
511
+ retry
512
+ end
513
+ raise e
514
+ # https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#L305
515
+ # RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION
516
+ when :unknown_partition
517
+ if @use_default_for_unknown_partition_error && actual_topic != @default_topic
518
+ log.debug "failed writing to topic '#{actual_topic}' with error '#{e.to_s}'. Writing message to topic '#{@default_topic}'"
519
+ actual_topic = @default_topic
520
+ retry
521
+ end
522
+
523
+ raise e
487
524
  else
488
525
  raise e
489
526
  end
metadata CHANGED
@@ -1,7 +1,7 @@
1
1
  --- !ruby/object:Gem::Specification
2
2
  name: fluent-plugin-kafka
3
3
  version: !ruby/object:Gem::Version
4
- version: 0.19.0
4
+ version: 0.19.2
5
5
  platform: ruby
6
6
  authors:
7
7
  - Hidemasa Togashi
@@ -9,7 +9,7 @@ authors:
9
9
  autorequire:
10
10
  bindir: bin
11
11
  cert_chain: []
12
- date: 2023-04-26 00:00:00.000000000 Z
12
+ date: 2023-10-13 00:00:00.000000000 Z
13
13
  dependencies:
14
14
  - !ruby/object:Gem::Dependency
15
15
  name: fluentd