Streaming Vault Audit Logs to ElasticSearch
December 10, 2018
vault
ksql
kafka
elassticsearch
kafka-connect
Confluent’s kafka-connect-elasticsearch connector allows you to read messages, in Avro format, from a Kafka topic and insert them into a ElasticSearch index.
Vault only supports writing to Kafka in JSON format, so we’ll use KSQL to convert the messages to AVRO, and then uses Kafka Connect to get the messages into Kibana.
CREATE STREAM vault_audit_logs
(
time VARCHAR,
type VARCHAR,
auth STRUCT
<client_token VARCHAR,
accessor VARCHAR,
display_name VARCHAR,
policies ARRAY<STRING>,
token_policies ARRAY<STRING>,
entity_id VARCHAR,
token_type VARCHAR>,
request STRUCT
<id VARCHAR,
operation VARCHAR,
path VARCHAR,
remote_address VARCHAR>,
response STRUCT<data STRUCT<error VARCHAR>>,
error VARCHAR
)
WITH(
KAFKA_TOPIC='vault',
VALUE_FORMAT='JSON',
TIMESTAMP='time',
TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]''Z'''
);
And then persist it to to the vault_avro
topic:
CREATE STREAM WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='vault_avro') AS SELECT * FROM vault_audit_logs;
To send the logs to ElasticSearch, you can create a Kafka-Connect connector to read from this Avro topic and save each message in ElasticSearch.
provider "kafka-connect" {
url = "http://localhost:8083"
}
resource "kafka-connect_connector" "vault-es" {
name = "vault-es"
config = {
"connector.class" = "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
"value.converter" = "io.confluent.connect.avro.AvroConverter"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"key.converter.schema.registry.url" = "http://localhost:8081"
"value.converter.schema.registry.url" = "http://localhost:8081"
"connection.url" = "http://localhost:9200"
"type.name" = "type.name=kafka-connect"
"topics" = "vault_avro"
"key.ignore" = "true"
}
}
It is common to use daily indexes in ElasticSearch, so that data can be rotated out.
We can utilize Kafka-Connect’s Single Message Transforms to add the timestamp of the message, and send the message to a daily index
Use the timestamp that Vault said was when the message happened (rather than when the message was sent to Kafka) with:
"transforms.InsertTimeStamp.type" = "org.apache.kafka.connect.transforms.InsertField$Value"
"transforms.InsertTimeStamp.timestamp.field" = "timestamp!"
And send to a daily index with:
"transforms.routeTS.type" = "org.apache.kafka.connect.transforms.TimestampRouter"
"transforms.routeTS.timestamp.format" = "yyyy.MM.dd"
"transforms.routeTS.topic.format" = "vault-$${timestamp}"
Make sure you enable both of the transforms, with the following line
"transforms" = "routeTS,InsertTimeStamp"