input plugin kafka
output plugin postgresql
when i consuming event id and timestamps are convert to string so its giving error because its creating conflicts between postgres columns datatype
CREATE TABLE IF NOT EXISTS public.integration_rate_limits(
id UUID,
reset_date_time TIMESTAMP,
“organization_id” UUID,
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP)
[agent]
interval = “1s”
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = “0s”
flush_interval = “1s”
flush_jitter = “0s”
precision = “”
hostname = “”
omit_hostname = true
[[inputs.kafka_consumer]]
name_override = “integration_rate_limits”
brokers = [“localhost:29092”]
topics = [“otg.ratelimit-telegraf.t”]
metadata_full = true
consumer_group = “consumer_group_rate_limit”
max_undelivered_messages = 1
max_message_len = 1000000
data_format=“json_v2”
[[inputs.kafka_consumer.json_v2]]
[[inputs.kafka_consumer.json_v2.field]]
path = “event.body.id”
rename = “id”
type = “string”
optional = true
[[inputs.kafka_consumer.json_v2.field]]
path = “event.body.type”
rename = “type”
type = “string”
optional = true
[[inputs.kafka_consumer.json_v2.field]]
path = “event.body.limit”
type = “int”
optional = true
[[inputs.kafka_consumer.json_v2.field]]
path = “event.body.remaining”
type = “int”
optional = true
[[inputs.kafka_consumer.json_v2.field]]
path = “event.body.used”
type = “int”
optional = true
[[inputs.kafka_consumer.json_v2.field]]
path = “event.body.resetDateTime”
rename = “reset_date_time”
optional = true
# [[inputs.kafka_consumer.json_v2.field]]
# path = “event.body.lastPolledDateTime”
# rename = “last_polled_date_time”
# type = “string”
# optional = true
# [[inputs.kafka_consumer.json_v2.tag]]
# path = “event.body.organization.id”
# rename = “organization_id”
# type = “string”
# optional = true
# [[inputs.kafka_consumer.json_v2.tag]]
# path = “event.body.integration.id”
# rename = “integration_id”
# type = “string”
# optional = true
[[inputs.kafka_consumer.json_v2]]
timestamp_path = “event.body.resetDateTime”
timestamp_format = “rfc3339”
[[processors.timestamp]]
field = “reset_date_time”
date_format = “unix”
timezone = “UTC”
source_timestamp_format = “RFC3339”
source_timestamp_timezone=“UTC”
destination_timestamp_format = “RFC3339”
destination_timestamp_timezone = “UTC”
[[outputs.postgresql]]
schema = “public”
connection = "host=localhost user=postgres password=password sslmode=disable dbname=postgres port=5432 pool_max_conns = 10 pool_min_conns = 1 pool_max_conn_lifetime = 2000s pool_max_conn_idle_time = 10s pool_health_check_period = 10s "
timestamp_column_name = “created_date”
timestamp_column_type = “timestamp with time zone”
create_templates = [
‘’’
INSERT INTO public.integration_rate_limits (id, type) VALUES({{ .id}}::UUID, {{ .type}}, {{ .limit}}, {{ .remaining}}, {{ .resetDateTime}}, {{ .lastPolledDateTime}}, {{ .organization_id}}, {{ .integration_id}} );
‘’’
]
log_level = “debug”