# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 120 def initialize super end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 124 def configure(conf) super require 'kubeclient' require 'active_support/core_ext/object/blank' require 'lru_redux' if @de_dot && (@de_dot_separator =~ /\./).present? raise Fluent::ConfigError, "Invalid de_dot_separator: cannot be or contain '.'" end if @include_namespace_id # For compatibility, use include_namespace_metadata instead @include_namespace_metadata = true end if @cache_ttl < 0 @cache_ttl = :none end @cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl) if @include_namespace_metadata @namespace_cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl) end @tag_to_kubernetes_name_regexp_compiled = Regexp.compile(@tag_to_kubernetes_name_regexp) @container_name_to_kubernetes_regexp_compiled = Regexp.compile(@container_name_to_kubernetes_regexp) # Use Kubernetes default service account if we're in a pod. if @kubernetes_url.nil? env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] if env_host.present? && env_port.present? @kubernetes_url = "https://#{env_host}:#{env_port}/api" end end # Use SSL certificate and bearer token from Kubernetes service account. if Dir.exist?(@secret_dir) ca_cert = File.join(@secret_dir, K8_POD_CA_CERT) pod_token = File.join(@secret_dir, K8_POD_TOKEN) if !@ca_file.present? and File.exist?(ca_cert) @ca_file = ca_cert end if !@bearer_token_file.present? and File.exist?(pod_token) @bearer_token_file = pod_token end end if @kubernetes_url.present? ssl_options = { client_cert: @client_cert.present? ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil, client_key: @client_key.present? ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil, ca_file: @ca_file, verify_ssl: @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE } auth_options = {} if @bearer_token_file.present? bearer_token = File.read(@bearer_token_file) auth_options[:bearer_token] = bearer_token end @client = Kubeclient::Client.new @kubernetes_url, @apiVersion, ssl_options: ssl_options, auth_options: auth_options begin @client.api_valid? rescue KubeException => kube_error raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}" end if @watch thread = Thread.new(self) { |this| this.start_watch } thread.abort_on_exception = true if @include_namespace_metadata namespace_thread = Thread.new(self) { |this| this.start_namespace_watch } namespace_thread.abort_on_exception = true end end end if @use_journal @merge_json_log_key = 'MESSAGE' self.class.class_eval { alias_method :filter_stream, :filter_stream_from_journal } else @merge_json_log_key = 'log' self.class.class_eval { alias_method :filter_stream, :filter_stream_from_files } end @annotations_regexps = [] @annotation_match.each do |regexp| begin @annotations_regexps << Regexp.compile(regexp) rescue RegexpError => e log.error "Error: invalid regular expression in annotation_match: #{e}" end end end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 351 def de_dot!(h) h.keys.each do |ref| if h[ref] && ref =~ /\./ v = h.delete(ref) newref = ref.to_s.gsub('.', @de_dot_separator) h[newref] = v end end end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 263 def filter_stream(tag, es) es end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 267 def filter_stream_from_files(tag, es) new_es = MultiEventStream.new match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) if match_data metadata = { 'docker' => { 'container_id' => match_data['docker_id'] }, 'kubernetes' => get_metadata_for_record( match_data['namespace'], match_data['pod_name'], match_data['container_name'], ), } end es.each { |time, record| record = merge_json_log(record) if @merge_json_log record = record.merge(metadata) if metadata new_es.add(time, record) } new_es end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 296 def filter_stream_from_journal(tag, es) new_es = MultiEventStream.new es.each { |time, record| record = merge_json_log(record) if @merge_json_log metadata = nil if record.has_key?('CONTAINER_NAME') && record.has_key?('CONTAINER_ID_FULL') metadata = record['CONTAINER_NAME'].match(@container_name_to_kubernetes_regexp_compiled) do |match_data| metadata = { 'docker' => { 'container_id' => record['CONTAINER_ID_FULL'] }, 'kubernetes' => get_metadata_for_record( match_data['namespace'], match_data['pod_name'], match_data['container_name'], ) } metadata end unless metadata log.debug "Error: could not match CONTAINER_NAME from record #{record}" end elsif record.has_key?('CONTAINER_NAME') && record['CONTAINER_NAME'].start_with?('k8s_') log.debug "Error: no container name and id in record #{record}" end if metadata record = record.merge(metadata) end new_es.add(time, record) } new_es end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 227 def get_metadata_for_record(namespace_name, pod_name, container_name) metadata = { 'container_name' => container_name, 'namespace_name' => namespace_name, 'pod_name' => pod_name, } if @kubernetes_url.present? cache_key = "#{namespace_name}_#{pod_name}" this = self pod_metadata = @cache.getset(cache_key) { md = this.get_pod_metadata( namespace_name, pod_name, ) md } metadata.merge!(pod_metadata) if pod_metadata if @include_namespace_metadata namespace_metadata = @namespace_cache.getset(namespace_name) { begin namespace = @client.get_namespace(namespace_name) if namespace parse_namespace_metadata(namespace) end rescue KubeException nil end } metadata.merge!(namespace_metadata) if namespace_metadata end end metadata end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 110 def get_pod_metadata(namespace_name, pod_name) begin metadata = @client.get_pod(pod_name, namespace_name) return if !metadata return parse_pod_metadata(metadata) rescue KubeException nil end end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 361 def match_annotations(annotations) result = {} @annotations_regexps.each do |regexp| annotations.each do |key, value| if ::Fluent::StringUtil.match_regexp(regexp, key.to_s) result[key] = value end end end result end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 335 def merge_json_log(record) if record.has_key?(@merge_json_log_key) log = record[@merge_json_log_key].strip if log[0].eql?('{') && log[-1].eql?('}') begin record = JSON.parse(log).merge(record) unless @preserve_json_log record.delete(@merge_json_log_key) end rescue JSON::ParserError end end end record end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 95 def parse_namespace_metadata(namespace_object) labels = syms_to_strs(namespace_object['metadata']['labels'].to_h) annotations = match_annotations(syms_to_strs(namespace_object['metadata']['annotations'].to_h)) if @de_dot self.de_dot!(labels) self.de_dot!(annotations) end kubernetes_metadata = { 'namespace_id' => namespace_object['metadata']['uid'] } kubernetes_metadata['namespace_labels'] = labels unless labels.empty? kubernetes_metadata['namespace_annotations'] = annotations unless annotations.empty? return kubernetes_metadata end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 76 def parse_pod_metadata(pod_object) labels = syms_to_strs(pod_object['metadata']['labels'].to_h) annotations = match_annotations(syms_to_strs(pod_object['metadata']['annotations'].to_h)) if @de_dot self.de_dot!(labels) self.de_dot!(annotations) end kubernetes_metadata = { 'namespace_name' => pod_object['metadata']['namespace'], 'pod_id' => pod_object['metadata']['uid'], 'pod_name' => pod_object['metadata']['name'], 'labels' => labels, 'host' => pod_object['spec']['nodeName'], 'master_url' => @kubernetes_url } kubernetes_metadata['annotations'] = annotations unless annotations.empty? return kubernetes_metadata end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 404 def start_namespace_watch resource_version = @client.get_namespaces.resourceVersion watcher = @client.watch_namespaces(resource_version) watcher.each do |notice| puts notice case notice.type when 'MODIFIED' cache_key = notice.object['metadata']['name'] cached = @namespace_cache[cache_key] if cached @namespace_cache[cache_key] = parse_namespace_metadata(notice.object) end when 'DELETED' @namespace_cache.delete(notice.object['metadata']['name']) else # Don't pay attention to creations, since the created namespace may not # be used by any pod on this node. end end end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 373 def start_watch begin resource_version = @client.get_pods.resourceVersion watcher = @client.watch_pods(resource_version) rescue Exception => e message = "Exception encountered fetching metadata from Kubernetes API endpoint: #{e.message}" if e.respond_to?(:response) message += " (#{e.response})" end raise Fluent::ConfigError, message end watcher.each do |notice| case notice.type when 'MODIFIED' cache_key = "#{notice.object['metadata']['namespace']}_#{notice.object['metadata']['name']}" cached = @cache[cache_key] if cached @cache[cache_key] = parse_pod_metadata(notice.object) end when 'DELETED' cache_key = "#{notice.object['metadata']['namespace']}_#{notice.object['metadata']['name']}" @cache.delete(cache_key) else # Don't pay attention to creations, since the created pod may not # end up on this node. end end end
# File lib/fluent/plugin/filter_kubernetes_metadata.rb, line 61 def syms_to_strs(hsh) newhsh = {} hsh.each_pair do |kk,vv| if vv.is_a?(Hash) vv = syms_to_strs(vv) end if kk.is_a?(Symbol) newhsh[kk.to_s] = vv else newhsh[kk] = vv end end newhsh end