fluent-plugin-kafka 0.18.0 → 0.19.0

This diff represents the content of publicly available package versions that have been released to one of the supported registries. The information contained in this diff is provided for informational purposes only and reflects changes between package versions as they appear in their respective public registries.
checksums.yaml CHANGED
@@ -1,7 +1,7 @@
1
1
  ---
2
2
  SHA256:
3
- metadata.gz: 0a8590f6d34bbdb2faa991bba6e32a1424623c7ddd9609dc1d2ffcbeabaa20e1
4
- data.tar.gz: a653d9dba00fa82f18071304be9093cd6d620bf4bda29d5e00a75113d05e8e3e
3
+ metadata.gz: b4a8c37b041fedc3f95046620413e5f7da437557fc19294439585fa89fd5b244
4
+ data.tar.gz: fe4c8cc8df6b8b5b105fbf709044e10752d8b64f321b5a4e290cb7e316e10fe1
5
5
  SHA512:
6
- metadata.gz: 69a784e7d69e2229a036110997b2691143008d4ca786e18415e9a806a6d577b38465a4a3931166418a5b7e022dddbc62d014c0c784277ccaaecc9b6570822564
7
- data.tar.gz: 9f14d8d10a45bf390c801c93d79b893f4248ded7e37dc87f8c4453b04158754e1006aa7603874986e8c93812fe899197219abccb3bdb0ae4daacef6d516a0af3
6
+ metadata.gz: 2ff333ee092e0ffd653ab476acf8b2656b4ca59ea32d6dcc846eb8a174f69d98811272e8a735bfa3bdfac3d5b3753ce499adb5c8a215b3197310b1f4d822364e
7
+ data.tar.gz: ebf6cafbde9635cfc886ee4dce84495353b2d389e447e6188f6a88eb4ca11b19086ce5a12565bac550ef26936f424e4ec5a0fe908fad4b39bea984758ac44720
@@ -1,5 +1,6 @@
1
1
  name: Bug Report
2
2
  description: Create a report with a procedure for reproducing the bug
3
+ labels: "waiting-for-triage"
3
4
  body:
4
5
  - type: markdown
5
6
  attributes:
@@ -1,5 +1,6 @@
1
1
  name: Feature request
2
2
  description: Suggest an idea for this project
3
+ labels: "waiting-for-triage"
3
4
  body:
4
5
  - type: markdown
5
6
  attributes:
@@ -0,0 +1,6 @@
1
+ version: 2
2
+ updates:
3
+ - package-ecosystem: 'github-actions'
4
+ directory: '/'
5
+ schedule:
6
+ interval: 'weekly'
@@ -12,12 +12,15 @@ jobs:
12
12
  strategy:
13
13
  fail-fast: false
14
14
  matrix:
15
- ruby: [ '3.1', '3.0', '2.7', '2.6' ]
15
+ ruby: [ '3.2', '3.1', '3.0', '2.7' ]
16
16
  os:
17
17
  - ubuntu-latest
18
- name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }}
18
+ rdkafka_versions:
19
+ - { min: '>= 0.6.0', max: '< 0.12.0' }
20
+ - { min: '>= 0.12.0', max: '>= 0.12.0' }
21
+ name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }} with rdkafka gem version (min ${{ matrix.rdkafka_versions.min }} max ${{ matrix.rdkafka_versions.max }})
19
22
  steps:
20
- - uses: actions/checkout@v2
23
+ - uses: actions/checkout@v3
21
24
  - uses: ruby/setup-ruby@v1
22
25
  with:
23
26
  ruby-version: ${{ matrix.ruby }}
@@ -33,6 +36,8 @@ jobs:
33
36
  - name: unit testing
34
37
  env:
35
38
  CI: true
39
+ RDKAFKA_VERSION_MIN_RANGE: ${{ matrix.rdkafka_versions.min }}
40
+ RDKAFKA_VERSION_MAX_RANGE: ${{ matrix.rdkafka_versions.max }}
36
41
  run: |
37
42
  sudo ./ci/prepare-kafka-server.sh
38
43
  gem install bundler rake
@@ -7,7 +7,7 @@ jobs:
7
7
  stale:
8
8
  runs-on: ubuntu-latest
9
9
  steps:
10
- - uses: actions/stale@v3
10
+ - uses: actions/stale@v8
11
11
  with:
12
12
  repo-token: ${{ secrets.GITHUB_TOKEN }}
13
13
  days-before-stale: 90
@@ -18,5 +18,7 @@ jobs:
18
18
  close-pr-message: "This PR was automatically closed because of stale in 30 days"
19
19
  stale-pr-label: "stale"
20
20
  stale-issue-label: "stale"
21
- exempt-issue-labels: "bug,enhancement,help wanted"
22
- exempt-pr-labels: "bug,enhancement,help wanted"
21
+ exempt-issue-labels: "bug,enhancement,help wanted,waiting-for-triage"
22
+ exempt-pr-labels: "bug,enhancement,help wanted,waiting-for-triage"
23
+ exempt-all-assignees: true
24
+ exempt-all-milestones: true
data/ChangeLog CHANGED
@@ -1,3 +1,12 @@
1
+ Release 0.19.0 - 2023/04/26
2
+ * out_kafka2: Add support for AWS IAM authentication
3
+ * in_kafka, in_kafka_group, out_kafka2: Add support for ssl client cert key password
4
+ * out_rdkafka2: Mask `ssl_client_cert_key_password` on dumping it to log
5
+ * out_rdkafka2: Support rdkafka-ruby 0.12
6
+
7
+ Release 0.18.1 - 2022/08/17
8
+ * out_kafka2: Fix a bug that it doesn't respect `chunk_limit_records` and `chunk_limit_size`
9
+
1
10
  Release 0.18.0 - 2022/07/21
2
11
  * out_kafka2: Keep alive Kafka connections between flushes
3
12
  * out_rdkafka2: Enable to set SASL credentials via `username` and `password` parameters
data/Gemfile CHANGED
@@ -3,4 +3,4 @@ source 'https://rubygems.org'
3
3
  # Specify your gem's dependencies in fluent-plugin-kafka.gemspec
4
4
  gemspec
5
5
 
6
- gem 'rdkafka', '>= 0.6.0' if ENV["USE_RDKAFKA"]
6
+ gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA']
data/README.md CHANGED
@@ -36,6 +36,7 @@ If you want to use zookeeper related parameters, you also need to install zookee
36
36
  - ssl_ca_cert
37
37
  - ssl_client_cert
38
38
  - ssl_client_cert_key
39
+ - ssl_client_cert_key_password
39
40
  - ssl_ca_certs_from_system
40
41
 
41
42
  Set path to SSL related files. See [Encryption and Authentication using SSL](https://github.com/zendesk/ruby-kafka#encryption-and-authentication-using-ssl) for more detail.
@@ -187,6 +188,10 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
187
188
  @type kafka2
188
189
 
189
190
  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
191
+
192
+ # Kafka topic, placerholders are supported. Chunk keys are required in the Buffer section inorder for placeholders
193
+ # to work.
194
+ topic (string) :default => nil
190
195
  topic_key (string) :default => 'topic'
191
196
  partition_key (string) :default => 'partition'
192
197
  partition_key_key (string) :default => 'partition_key'
@@ -208,6 +213,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
208
213
  partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
209
214
  share_producer (bool) :default => false
210
215
 
216
+ # If you intend to rely on AWS IAM auth to MSK with long lived credentials
217
+ # https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
218
+ #
219
+ # For AWS STS support, see status in
220
+ # - https://github.com/zendesk/ruby-kafka/issues/944
221
+ # - https://github.com/zendesk/ruby-kafka/pull/951
222
+ sasl_aws_msk_iam_access_key_id (string) :default => nil
223
+ sasl_aws_msk_iam_secret_key_id (string) :default => nil
224
+ sasl_aws_msk_iam_aws_region (string) :default => nil
225
+
211
226
  <format>
212
227
  @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
213
228
  </format>
@@ -243,13 +258,12 @@ ruby-kafka's log is routed to fluentd log so you can see ruby-kafka's log in flu
243
258
 
244
259
  Supports following ruby-kafka's producer options.
245
260
 
246
- - max_send_retries - default: 1 - Number of times to retry sending of messages to a leader.
261
+ - max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
247
262
  - required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
248
263
  - ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
249
264
  - compression_codec - default: nil - The codec the producer uses to compress messages.
250
265
  - max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
251
266
  - discard_kafka_delivery_failed - default: false - discard the record where [Kafka::DeliveryFailed](http://www.rubydoc.info/gems/ruby-kafka/Kafka/DeliveryFailed) occurred
252
- - monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported
253
267
 
254
268
  If you want to know about detail of monitoring, see also https://github.com/zendesk/ruby-kafka#monitoring
255
269
 
@@ -420,6 +434,16 @@ Support of fluentd v0.12 has ended. `kafka_buffered` will be an alias of `kafka2
420
434
  monitoring_list (array) :default => []
421
435
  </match>
422
436
 
437
+ `kafka_buffered` supports the following `ruby-kafka` parameters:
438
+
439
+ - max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
440
+ - required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
441
+ - ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
442
+ - compression_codec - default: nil - The codec the producer uses to compress messages.
443
+ - max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
444
+ - discard_kafka_delivery_failed - default: false - discard the record where [Kafka::DeliveryFailed](http://www.rubydoc.info/gems/ruby-kafka/Kafka/DeliveryFailed) occurred
445
+ - monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported
446
+
423
447
  `kafka_buffered` has two additional parameters:
424
448
 
425
449
  - kafka_agg_max_bytes - default: 4096 - Maximum value of total message size to be included in one batch transmission.
@@ -0,0 +1,3 @@
1
+ # Examples
2
+
3
+ This directory contains example Fluentd config for this plugin
@@ -0,0 +1,32 @@
1
+ <source>
2
+ @type sample
3
+ sample {"hello": "world"}
4
+ rate 7000
5
+ tag sample.hello.world
6
+ </source>
7
+
8
+ <match sample.**>
9
+ @type kafka2
10
+
11
+ brokers "broker:29092"
12
+
13
+ # Writes to topic `events.sample.hello.world`
14
+ topic "events.${tag}"
15
+
16
+ # Writes to topic `hello.world`
17
+ # topic "${tag[1]}.${tag[2]}"
18
+
19
+ <format>
20
+ @type json
21
+ </format>
22
+
23
+ <buffer tag>
24
+ flush_at_shutdown true
25
+ flush_mode interval
26
+ flush_interval 1s
27
+ chunk_limit_size 3MB
28
+ chunk_full_threshold 1
29
+ total_limit_size 1024MB
30
+ overflow_action block
31
+ </buffer>
32
+ </match>
@@ -0,0 +1,23 @@
1
+ <source>
2
+ @type sample
3
+ sample {"hello": "world", "some_record":{"event":"message"}}
4
+ rate 7000
5
+ tag sample.hello.world
6
+ </source>
7
+
8
+ <match sample.**>
9
+ @type kafka2
10
+
11
+ brokers "broker:29092"
12
+
13
+ record_key "some_record"
14
+ default_topic "events"
15
+
16
+ <format>
17
+ # requires the fluent-plugin-formatter-protobuf gem
18
+ # see its docs for full usage
19
+ @type protobuf
20
+ class_name SomeRecord
21
+ include_paths ["/opt/fluent-plugin-formatter-protobuf/some_record_pb.rb"]
22
+ </format>
23
+ </match>
@@ -0,0 +1,31 @@
1
+ <source>
2
+ @type sample
3
+ sample {"hello": "world", "some_record":{"event":"message"}}
4
+ rate 7000
5
+ tag sample.hello.world
6
+ </source>
7
+
8
+ <match sample.**>
9
+ @type kafka2
10
+
11
+ brokers "broker:29092"
12
+
13
+ # {"event": "message"} will be formatted and sent to Kafka
14
+ record_key "some_record"
15
+
16
+ default_topic "events"
17
+
18
+ <format>
19
+ @type json
20
+ </format>
21
+
22
+ <buffer>
23
+ flush_at_shutdown true
24
+ flush_mode interval
25
+ flush_interval 1s
26
+ chunk_limit_size 3MB
27
+ chunk_full_threshold 1
28
+ total_limit_size 1024MB
29
+ overflow_action block
30
+ </buffer>
31
+ </match>
@@ -13,12 +13,12 @@ Gem::Specification.new do |gem|
13
13
  gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
14
14
  gem.name = "fluent-plugin-kafka"
15
15
  gem.require_paths = ["lib"]
16
- gem.version = '0.18.0'
16
+ gem.version = '0.19.0'
17
17
  gem.required_ruby_version = ">= 2.1.0"
18
18
 
19
19
  gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
20
20
  gem.add_dependency 'ltsv'
21
- gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2'
21
+ gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
22
22
  gem.add_development_dependency "rake", ">= 0.9.2"
23
23
  gem.add_development_dependency "test-unit", ">= 3.0.8"
24
24
  gem.add_development_dependency "test-unit-rr", "~> 1.0"
@@ -200,16 +200,19 @@ class Fluent::KafkaInput < Fluent::Input
200
200
  if @scram_mechanism != nil && @username != nil && @password != nil
201
201
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
202
202
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
203
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
203
204
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
204
205
  sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
205
206
  elsif @username != nil && @password != nil
206
207
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
207
208
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
209
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
208
210
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password,
209
211
  sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
210
212
  else
211
213
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
212
214
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
215
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
213
216
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
214
217
  ssl_verify_hostname: @ssl_verify_hostname)
215
218
  end
@@ -188,16 +188,19 @@ class Fluent::KafkaGroupInput < Fluent::Input
188
188
  if @scram_mechanism != nil && @username != nil && @password != nil
189
189
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
190
190
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
191
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
191
192
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
192
193
  sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
193
194
  elsif @username != nil && @password != nil
194
195
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
195
196
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
197
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
196
198
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password,
197
199
  sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
198
200
  else
199
201
  @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
200
202
  ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
203
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
201
204
  ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
202
205
  ssl_verify_hostname: @ssl_verify_hostname)
203
206
  end
@@ -1,5 +1,18 @@
1
1
  module Fluent
2
2
  module KafkaPluginUtil
3
+ module AwsIamSettings
4
+ def self.included(klass)
5
+ klass.instance_eval do
6
+ config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true,
7
+ desc: "AWS access key Id for IAM authentication to MSK."
8
+ config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true,
9
+ desc: "AWS access key secret for IAM authentication to MSK."
10
+ config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil,
11
+ desc: "AWS region for IAM authentication to MSK."
12
+ end
13
+ end
14
+ end
15
+
3
16
  module SSLSettings
4
17
  def self.included(klass)
5
18
  klass.instance_eval {
@@ -10,6 +23,8 @@ module Fluent
10
23
  :desc => "a PEM encoded client cert to use with and SSL connection. Must be used in combination with ssl_client_cert_key."
11
24
  config_param :ssl_client_cert_key, :string, :default => nil,
12
25
  :desc => "a PEM encoded client cert key to use with and SSL connection. Must be used in combination with ssl_client_cert."
26
+ config_param :ssl_client_cert_key_password, :string, :default => nil, secret: true,
27
+ :desc => "a PEM encoded client cert key password to use with SSL connection."
13
28
  config_param :ssl_client_cert_chain, :string, :default => nil,
14
29
  :desc => "an extra PEM encoded cert to use with and SSL connection."
15
30
  config_param :ssl_ca_certs_from_system, :bool, :default => false,
@@ -38,9 +38,15 @@ module Kafka
38
38
  end
39
39
 
40
40
  # for out_kafka2
41
+ # Majority (if not all) of this code is lifted from https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb
42
+ # with the main difference where we have removed any checks regarding max_buffer_bytesize and max_buffer_size
43
+ # The reason for doing this is to provide a better UX for our users where they only need to set those bounds in
44
+ # the Buffer section using `chunk_limit_size` and `chunk_limit_records`.
45
+ #
46
+ # We should reconsider this in the future in case the `ruby-kafka` library drastically changes its internal.
41
47
  module Kafka
42
48
  class Client
43
- def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60)
49
+ def custom_producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60)
44
50
  cluster = initialize_cluster
45
51
  compressor = Compressor.new(
46
52
  codec_name: compression_codec,
@@ -57,8 +63,7 @@ module Kafka
57
63
  transactional_timeout: transactional_timeout,
58
64
  )
59
65
 
60
- TopicProducer.new(topic,
61
- cluster: cluster,
66
+ CustomProducer.new(cluster: cluster,
62
67
  transaction_manager: transaction_manager,
63
68
  logger: @logger,
64
69
  instrumenter: @instrumenter,
@@ -74,8 +79,8 @@ module Kafka
74
79
  end
75
80
  end
76
81
 
77
- class TopicProducer
78
- def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:)
82
+ class CustomProducer
83
+ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:)
79
84
  @cluster = cluster
80
85
  @transaction_manager = transaction_manager
81
86
  @logger = logger
@@ -88,10 +93,6 @@ module Kafka
88
93
  @max_buffer_bytesize = max_buffer_bytesize
89
94
  @compressor = compressor
90
95
  @partitioner = partitioner
91
-
92
- @topic = topic
93
- @cluster.add_target_topics(Set.new([topic]))
94
-
95
96
  # A buffer organized by topic/partition.
96
97
  @buffer = MessageBuffer.new
97
98
 
@@ -99,12 +100,12 @@ module Kafka
99
100
  @pending_message_queue = PendingMessageQueue.new
100
101
  end
101
102
 
102
- def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now)
103
+ def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now, topic: nil)
103
104
  message = PendingMessage.new(
104
105
  value: value,
105
106
  key: key,
106
107
  headers: headers,
107
- topic: @topic,
108
+ topic: topic,
108
109
  partition: partition,
109
110
  partition_key: partition_key,
110
111
  create_time: create_time
@@ -245,12 +246,13 @@ module Kafka
245
246
 
246
247
  def assign_partitions!
247
248
  failed_messages = []
248
- partition_count = @cluster.partitions_for(@topic).count
249
249
 
250
250
  @pending_message_queue.each do |message|
251
251
  partition = message.partition
252
252
 
253
253
  begin
254
+ partition_count = @cluster.partitions_for(message.topic).count
255
+
254
256
  if partition.nil?
255
257
  partition = @partitioner.call(partition_count, message)
256
258
  end
@@ -95,6 +95,7 @@ DESC
95
95
  config_set_default :@type, 'json'
96
96
  end
97
97
 
98
+ include Fluent::KafkaPluginUtil::AwsIamSettings
98
99
  include Fluent::KafkaPluginUtil::SSLSettings
99
100
  include Fluent::KafkaPluginUtil::SaslSettings
100
101
 
@@ -113,25 +114,47 @@ DESC
113
114
  def refresh_client(raise_error = true)
114
115
  begin
115
116
  logger = @get_kafka_client_log ? log : nil
117
+ use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
116
118
  if @scram_mechanism != nil && @username != nil && @password != nil
117
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
118
- ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
119
- ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
120
- sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
121
- partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
119
+ sasl_params = {
120
+ sasl_scram_username: @username,
121
+ sasl_scram_password: @password,
122
+ sasl_scram_mechanism: @scram_mechanism,
123
+ }
122
124
  elsif @username != nil && @password != nil
123
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
124
- ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
125
- ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
126
- ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
127
- partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
125
+ sasl_params = {
126
+ sasl_plain_username: @username,
127
+ sasl_plain_password: @password,
128
+ }
129
+ elsif use_long_lived_aws_credentials
130
+ sasl_params = {
131
+ sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
132
+ sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
133
+ sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
134
+ }
128
135
  else
129
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
130
- ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
131
- ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
132
- ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
133
- partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
136
+ sasl_params = {
137
+ sasl_gssapi_principal: @principal,
138
+ sasl_gssapi_keytab: @keytab,
139
+ }
134
140
  end
141
+ @kafka = Kafka.new(
142
+ seed_brokers: @seed_brokers,
143
+ client_id: @client_id,
144
+ logger: logger,
145
+ connect_timeout: @connect_timeout,
146
+ socket_timeout: @socket_timeout,
147
+ ssl_ca_cert_file_path: @ssl_ca_cert,
148
+ ssl_client_cert: read_ssl_file(@ssl_client_cert),
149
+ ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
150
+ ssl_client_cert_key_password: @ssl_client_cert_key_password,
151
+ ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
152
+ ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
153
+ ssl_verify_hostname: @ssl_verify_hostname,
154
+ resolve_seed_brokers: @resolve_seed_brokers,
155
+ partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function),
156
+ sasl_over_ssl: @sasl_over_ssl,
157
+ **sasl_params)
135
158
  log.info "initialized kafka producer: #{@client_id}"
136
159
  rescue Exception => e
137
160
  if raise_error # During startup, error should be reported to engine and stop its phase for safety.
@@ -207,7 +230,7 @@ DESC
207
230
  end
208
231
 
209
232
  def create_producer
210
- @kafka.producer(**@producer_opts)
233
+ @kafka.custom_producer(**@producer_opts)
211
234
  end
212
235
 
213
236
  def start
@@ -5,9 +5,37 @@ require 'fluent/plugin/kafka_plugin_util'
5
5
 
6
6
  require 'rdkafka'
7
7
 
8
+ # This is required for `rdkafka` version >= 0.12.0
9
+ # Overriding the close method in order to provide a time limit for when it should be forcibly closed
10
+ class Rdkafka::Producer::Client
11
+ # return false if producer is forcefully closed, otherwise return true
12
+ def close(timeout=nil)
13
+ return unless @native
14
+
15
+ # Indicate to polling thread that we're closing
16
+ @polling_thread[:closing] = true
17
+ # Wait for the polling thread to finish up
18
+ thread = @polling_thread.join(timeout)
19
+
20
+ Rdkafka::Bindings.rd_kafka_destroy(@native)
21
+
22
+ @native = nil
23
+
24
+ return !thread.nil?
25
+ end
26
+ end
27
+
8
28
  class Rdkafka::Producer
9
29
  # return false if producer is forcefully closed, otherwise return true
10
30
  def close(timeout = nil)
31
+ rdkafka_version = Rdkafka::VERSION || '0.0.0'
32
+ # Rdkafka version >= 0.12.0 changed its internals
33
+ if Gem::Version::create(rdkafka_version) >= Gem::Version.create('0.12.0')
34
+ ObjectSpace.undefine_finalizer(self)
35
+
36
+ return @client.close(timeout)
37
+ end
38
+
11
39
  @closing = true
12
40
  # Wait for the polling thread to finish up
13
41
  # If the broker isn't alive, the thread doesn't exit
@@ -95,7 +123,6 @@ DESC
95
123
  config_param :max_enqueue_bytes_per_second, :size, :default => nil, :desc => 'The maximum number of enqueueing bytes per second'
96
124
 
97
125
  config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name'
98
- config_param :ssl_client_cert_key_password, :string, :default => nil, :desc => 'Used for ssl.key.password'
99
126
 
100
127
  config_section :buffer do
101
128
  config_set_default :chunk_keys, ["topic"]
metadata CHANGED
@@ -1,7 +1,7 @@
1
1
  --- !ruby/object:Gem::Specification
2
2
  name: fluent-plugin-kafka
3
3
  version: !ruby/object:Gem::Version
4
- version: 0.18.0
4
+ version: 0.19.0
5
5
  platform: ruby
6
6
  authors:
7
7
  - Hidemasa Togashi
@@ -9,7 +9,7 @@ authors:
9
9
  autorequire:
10
10
  bindir: bin
11
11
  cert_chain: []
12
- date: 2022-07-21 00:00:00.000000000 Z
12
+ date: 2023-04-26 00:00:00.000000000 Z
13
13
  dependencies:
14
14
  - !ruby/object:Gem::Dependency
15
15
  name: fluentd
@@ -51,7 +51,7 @@ dependencies:
51
51
  requirements:
52
52
  - - ">="
53
53
  - !ruby/object:Gem::Version
54
- version: 1.4.0
54
+ version: 1.5.0
55
55
  - - "<"
56
56
  - !ruby/object:Gem::Version
57
57
  version: '2'
@@ -61,7 +61,7 @@ dependencies:
61
61
  requirements:
62
62
  - - ">="
63
63
  - !ruby/object:Gem::Version
64
- version: 1.4.0
64
+ version: 1.5.0
65
65
  - - "<"
66
66
  - !ruby/object:Gem::Version
67
67
  version: '2'
@@ -146,6 +146,7 @@ files:
146
146
  - ".github/ISSUE_TEMPLATE/bug_report.yaml"
147
147
  - ".github/ISSUE_TEMPLATE/config.yml"
148
148
  - ".github/ISSUE_TEMPLATE/feature_request.yaml"
149
+ - ".github/dependabot.yml"
149
150
  - ".github/workflows/linux.yml"
150
151
  - ".github/workflows/stale-actions.yml"
151
152
  - ".gitignore"
@@ -155,6 +156,10 @@ files:
155
156
  - README.md
156
157
  - Rakefile
157
158
  - ci/prepare-kafka-server.sh
159
+ - examples/README.md
160
+ - examples/out_kafka2/dynamic_topic_based_on_tag.conf
161
+ - examples/out_kafka2/protobuf-formatter.conf
162
+ - examples/out_kafka2/record_key.conf
158
163
  - fluent-plugin-kafka.gemspec
159
164
  - lib/fluent/plugin/in_kafka.rb
160
165
  - lib/fluent/plugin/in_kafka_group.rb