fluent-plugin-kafka 0.19.0 → 0.19.2
This diff represents the content of publicly available package versions that have been released to one of the supported registries. The information contained in this diff is provided for informational purposes only and reflects changes between package versions as they appear in their respective public registries.
- checksums.yaml +4 -4
- data/.github/workflows/linux.yml +1 -1
- data/ChangeLog +6 -0
- data/README.md +6 -0
- data/fluent-plugin-kafka.gemspec +1 -1
- data/lib/fluent/plugin/out_rdkafka2.rb +42 -5
- metadata +2 -2
checksums.yaml
CHANGED
@@ -1,7 +1,7 @@
|
|
1
1
|
---
|
2
2
|
SHA256:
|
3
|
-
metadata.gz:
|
4
|
-
data.tar.gz:
|
3
|
+
metadata.gz: 62d114017cabc47f58cd39db1ea21dd8cfb6d76126c4804741e25766a5b4576c
|
4
|
+
data.tar.gz: f5e4ea9adb96a8f2f46077c0ac89b51e1c177ea0933242ed6cbb6472e40577a2
|
5
5
|
SHA512:
|
6
|
-
metadata.gz:
|
7
|
-
data.tar.gz:
|
6
|
+
metadata.gz: 80f4d2ce4d23d95837d3b344841339fc517581bc3347e17e5352a135b088756317b970b559eb66f05e0216ce8841c9e374c5ed0fd09e48d22346249ec9aec0b9
|
7
|
+
data.tar.gz: 8d89ff55bf6ac92e5d2c7a3942882a6b307e6fe19ede620bf299d54feb6f2f1d70fcdece731008324a993c96c69f44456a71798c9af879666de9d017b38e39c3
|
data/.github/workflows/linux.yml
CHANGED
@@ -20,7 +20,7 @@ jobs:
|
|
20
20
|
- { min: '>= 0.12.0', max: '>= 0.12.0' }
|
21
21
|
name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }} with rdkafka gem version (min ${{ matrix.rdkafka_versions.min }} max ${{ matrix.rdkafka_versions.max }})
|
22
22
|
steps:
|
23
|
-
- uses: actions/checkout@
|
23
|
+
- uses: actions/checkout@v4
|
24
24
|
- uses: ruby/setup-ruby@v1
|
25
25
|
with:
|
26
26
|
ruby-version: ${{ matrix.ruby }}
|
data/ChangeLog
CHANGED
@@ -1,3 +1,9 @@
|
|
1
|
+
Release 0.19.2 - 2023/10/13
|
2
|
+
* out_rdkafka2: Add `discard_kafka_delivery_failed_regex`
|
3
|
+
|
4
|
+
Release 0.19.1 - 2023/09/20
|
5
|
+
* out_rdkafka2: Add `use_default_for_unknown_topic` & `use_default_for_unknown_partition_error`
|
6
|
+
|
1
7
|
Release 0.19.0 - 2023/04/26
|
2
8
|
* out_kafka2: Add support for AWS IAM authentication
|
3
9
|
* in_kafka, in_kafka_group, out_kafka2: Add support for ssl client cert key password
|
data/README.md
CHANGED
@@ -510,11 +510,14 @@ You need to install rdkafka gem.
|
|
510
510
|
partition_key_key (string) :default => 'partition_key'
|
511
511
|
message_key_key (string) :default => 'message_key'
|
512
512
|
default_topic (string) :default => nil
|
513
|
+
use_default_for_unknown_topic (bool) :default => false
|
514
|
+
use_default_for_unknown_partition_error (bool) :default => false
|
513
515
|
default_partition_key (string) :default => nil
|
514
516
|
default_message_key (string) :default => nil
|
515
517
|
exclude_topic_key (bool) :default => false
|
516
518
|
exclude_partition_key (bool) :default => false
|
517
519
|
discard_kafka_delivery_failed (bool) :default => false (No discard)
|
520
|
+
discard_kafka_delivery_failed_regex (regexp) :default => nil (No discard)
|
518
521
|
use_event_time (bool) :default => false
|
519
522
|
|
520
523
|
# same with kafka2
|
@@ -557,6 +560,9 @@ You need to install rdkafka gem.
|
|
557
560
|
max_enqueue_bytes_per_second (integer) :default => nil
|
558
561
|
</match>
|
559
562
|
|
563
|
+
`rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter:
|
564
|
+
- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`.
|
565
|
+
|
560
566
|
If you use v0.12, use `rdkafka` instead.
|
561
567
|
|
562
568
|
<match kafka.**>
|
data/fluent-plugin-kafka.gemspec
CHANGED
@@ -13,7 +13,7 @@ Gem::Specification.new do |gem|
|
|
13
13
|
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
|
14
14
|
gem.name = "fluent-plugin-kafka"
|
15
15
|
gem.require_paths = ["lib"]
|
16
|
-
gem.version = '0.19.
|
16
|
+
gem.version = '0.19.2'
|
17
17
|
gem.required_ruby_version = ">= 2.1.0"
|
18
18
|
|
19
19
|
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
|
@@ -65,6 +65,8 @@ DESC
|
|
65
65
|
config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"
|
66
66
|
config_param :default_topic, :string, :default => nil,
|
67
67
|
:desc => "Default output topic when record doesn't have topic field"
|
68
|
+
config_param :use_default_for_unknown_topic, :bool, :default => false, :desc => "If true, default_topic is used when topic not found"
|
69
|
+
config_param :use_default_for_unknown_partition_error, :bool, :default => false, :desc => "If true, default_topic is used when received unknown_partition error"
|
68
70
|
config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key"
|
69
71
|
config_param :default_message_key, :string, :default => nil
|
70
72
|
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
|
@@ -110,6 +112,7 @@ DESC
|
|
110
112
|
config_param :use_event_time, :bool, :default => false, :desc => 'Use fluentd event time for rdkafka timestamp'
|
111
113
|
config_param :max_send_limit_bytes, :size, :default => nil
|
112
114
|
config_param :discard_kafka_delivery_failed, :bool, :default => false
|
115
|
+
config_param :discard_kafka_delivery_failed_regex, :regexp, :default => nil
|
113
116
|
config_param :rdkafka_buffering_max_ms, :integer, :default => nil, :desc => 'Used for queue.buffering.max.ms'
|
114
117
|
config_param :rdkafka_buffering_max_messages, :integer, :default => nil, :desc => 'Used for queue.buffering.max.messages'
|
115
118
|
config_param :rdkafka_message_max_bytes, :integer, :default => nil, :desc => 'Used for message.max.bytes'
|
@@ -233,6 +236,9 @@ DESC
|
|
233
236
|
@rdkafka = Rdkafka::Config.new(config)
|
234
237
|
|
235
238
|
if @default_topic.nil?
|
239
|
+
if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error
|
240
|
+
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true"
|
241
|
+
end
|
236
242
|
if @chunk_keys.include?(@topic_key) && !@chunk_key_tag
|
237
243
|
log.warn "Use '#{@topic_key}' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer #{@topic_key},tag>"
|
238
244
|
end
|
@@ -456,9 +462,13 @@ DESC
|
|
456
462
|
if @discard_kafka_delivery_failed
|
457
463
|
log.warn "Delivery failed. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
|
458
464
|
else
|
459
|
-
|
460
|
-
|
461
|
-
|
465
|
+
if @discard_kafka_delivery_failed_regex != nil && @discard_kafka_delivery_failed_regex.match?(e.to_s)
|
466
|
+
log.warn "Delivery failed and matched regexp pattern #{@discard_kafka_delivery_failed_regex}. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
|
467
|
+
else
|
468
|
+
log.warn "Send exception occurred: #{e} at #{e.backtrace.first}"
|
469
|
+
# Raise exception to retry sendind messages
|
470
|
+
raise e
|
471
|
+
end
|
462
472
|
end
|
463
473
|
ensure
|
464
474
|
@writing_threads_mutex.synchronize { @writing_threads.delete(Thread.current) }
|
@@ -466,17 +476,25 @@ DESC
|
|
466
476
|
|
467
477
|
def enqueue_with_retry(producer, topic, record_buf, message_key, partition, headers, time)
|
468
478
|
attempt = 0
|
479
|
+
actual_topic = topic
|
480
|
+
|
469
481
|
loop do
|
470
482
|
begin
|
471
483
|
@enqueue_rate.raise_if_limit_exceeded(record_buf.bytesize) if @enqueue_rate
|
472
|
-
return producer.produce(topic:
|
484
|
+
return producer.produce(topic: actual_topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
|
473
485
|
rescue EnqueueRate::LimitExceeded => e
|
474
486
|
@enqueue_rate.revert if @enqueue_rate
|
475
487
|
duration = e.next_retry_clock - Fluent::Clock.now
|
476
488
|
sleep(duration) if duration > 0.0
|
477
489
|
rescue Exception => e
|
478
490
|
@enqueue_rate.revert if @enqueue_rate
|
479
|
-
|
491
|
+
|
492
|
+
if !e.respond_to?(:code)
|
493
|
+
raise e
|
494
|
+
end
|
495
|
+
|
496
|
+
case e.code
|
497
|
+
when :queue_full
|
480
498
|
if attempt <= @max_enqueue_retries
|
481
499
|
log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s"
|
482
500
|
sleep @enqueue_retry_backoff
|
@@ -484,6 +502,25 @@ DESC
|
|
484
502
|
else
|
485
503
|
raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times"
|
486
504
|
end
|
505
|
+
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#LL309C9-L309C41
|
506
|
+
# RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC
|
507
|
+
when :unknown_topic
|
508
|
+
if @use_default_for_unknown_topic && actual_topic != @default_topic
|
509
|
+
log.debug "'#{actual_topic}' topic not found. Retry with '#{@default_topic}' topic"
|
510
|
+
actual_topic = @default_topic
|
511
|
+
retry
|
512
|
+
end
|
513
|
+
raise e
|
514
|
+
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#L305
|
515
|
+
# RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION
|
516
|
+
when :unknown_partition
|
517
|
+
if @use_default_for_unknown_partition_error && actual_topic != @default_topic
|
518
|
+
log.debug "failed writing to topic '#{actual_topic}' with error '#{e.to_s}'. Writing message to topic '#{@default_topic}'"
|
519
|
+
actual_topic = @default_topic
|
520
|
+
retry
|
521
|
+
end
|
522
|
+
|
523
|
+
raise e
|
487
524
|
else
|
488
525
|
raise e
|
489
526
|
end
|
metadata
CHANGED
@@ -1,7 +1,7 @@
|
|
1
1
|
--- !ruby/object:Gem::Specification
|
2
2
|
name: fluent-plugin-kafka
|
3
3
|
version: !ruby/object:Gem::Version
|
4
|
-
version: 0.19.
|
4
|
+
version: 0.19.2
|
5
5
|
platform: ruby
|
6
6
|
authors:
|
7
7
|
- Hidemasa Togashi
|
@@ -9,7 +9,7 @@ authors:
|
|
9
9
|
autorequire:
|
10
10
|
bindir: bin
|
11
11
|
cert_chain: []
|
12
|
-
date: 2023-
|
12
|
+
date: 2023-10-13 00:00:00.000000000 Z
|
13
13
|
dependencies:
|
14
14
|
- !ruby/object:Gem::Dependency
|
15
15
|
name: fluentd
|