BaffleSMT
The following provides usage information for (io.baffle.datapipeline.kafka.transforms.BaffleSMT)
Description
BaffleSMT transforms the value of the Kafka record as per the passed transformation policy in a config file. This SMT can be used by majority of the connectors unless they change Kafka data schema. In that case Baffle provided a specific SMT for connector.
Use the concrete transformation type designed for the record value (io.baffle.datapipeline.kafka.transforms.BaffleSMT$Value)
Example
BaffleSMT can transform one or more fields of a complex (Struct or Map) value.
"transforms":"encryptVal",
"transforms.encryptVal.type":"io.baffle.datapipeline.kafka.transforms.BaffleSMT$Value",
"transforms.encryptVal.configFilePath":"/connectors/config.toml"
Before:
{
"id": "1000",
"type": { "id" : "10001", "name" : "donut"},
"name" : "Boston Creme",
"pricePerUnit": 0.75,
"topping":
[
{ "id": "5001", "type": "None"},
{ "id": "5002", "type": "Sprinkles"}
]
}
Policy:
The Kafka Record in "donut.inventory" topic needs to transformed with following policy. The Policy is transforming the name value in nested json structure.
[Policy]
version = "1.0"
[[Rules]]
match = ".*donut.inventory.*"
defaultAction = "fpe-encrypt"
defaultKeyId = 2
[[Rules.Transforms]]
fields = ["name", "type.name", "item.topping.type"]
More detail on policy can be found below or on the documentation folder of Baffle Transformation package.
After:
{
"id": "1000",
"type": { "id" : "10001", "name" : "rdcgo"},
"name" : "Tbhludg Jigar",
"pricePerUnit": 0.75,
"topping":
[
{ "id": "5001", "type": "Slkd"},
{ "id": "5002", "type": "Ifibmuwaw" }
]
}
BaffleDebeziumSMT
The following provides usage information for (io.baffle.datapipeline.kafka.transforms.BaffleDebeziumSMT)
Description
BaffleDebeziumSMT transforms the value of the Kafka record as per the passed transformation policy in a config file. This SMT is used Specific to Debezium connector.
Use the concrete transformation type designed for the record value (io.baffle.datapipeline.kafka.transforms.BaffleDebeziumSMT$Value)
Example
BaffleDebeziumSMT can transform one or more fields of a complex (Struct or Map) value.
"transforms":"encryptVal",
"transforms.encryptVal.type":"io.baffle.datapipeline.kafka.transforms.BaffleDebeziumSMT$Value",
"transforms.encryptVal.configFilePath":"/connectors/config.toml"
Before:
{
"payload": {
"before": {},
"after": {
"id": "1000",
"type": { "id" : "10001", "name" :"donut"},
"name" : Boston Creme",
"pricePerUnit": 0.75,
"topping":
[
{ "id": "5001", "type": "None"},
{ "id": "5002", "type": "Sprinkles"}
]
}
}
}
Policy:
The Kafka Record in "donut.inventory" topic needs to transformed with following policy (config.toml). The Policy is transforming the name value in nested json structure.
[Policy]
version = "1.0"
[[Rules]]
match = ".*donut.inventory.*"
defaultAction = "fpe-encrypt"
defaultKeyId = 2
[[Rules.Transforms]]
fields = ["name", "type.name", "item.topping.type"]
More detail on policy can be found below or on the documentation folder of Baffle Transformation package.
After:
{
"payload": {
"before": {},
"after": {
"id": "1000",
"type": { "id" : "10001", "name" : "rdcgo"},
"name" : "Tbhludg Jigar",
"pricePerUnit": 0.75,
"topping":
[
{ "id": "5001", "type": "Slkd"},
{ "id": "5002", "type": "Ifibmuwaw"}
]
}
}
}
Transformation Policy
Below is the documentation on Transformation policy
[Policy]
version = "1.0" // required
[[Rules]]
match = {{topic regex}} // required: regex for topic/stream to match to
defaultAction = "fpe-encrypt" // optional: default action, valid action are fpe-encrypt, fpe-decrypt, fph-encrypt, ctr-encrypt, ctr-decrypt
defaultKeyId = 2 // optional: default key id for rules
[[Rules.Transforms]]
fields = [ "id", "id.level1.level2", ...] // required fields to act upon, dot notation is supported
KeyId = 12 // optional: either entire rule level (defaultKeyId) or individual rule level is required
dataType = "integer" // optional : if not provided data type will be determined, valid types are string, integer, long, double, float, bigint, bigdecimal, localdate, localtime, and localdatetime
action = "ctr-decrypt" // optional: either entire rule level (defaultAction) or individual rule level is required
characterFormat = "cc" | "email" | "digit" // optional: ALPHANUM is by default only required for String type
pos = [start, stop] // optional: start stop param.
[[Rules.Transforms]]
fields = [ set of matching fields ]
transformations...
[[Rules]]
match = {{another topic match}}
Please submit a request on the Baffle Support Portal, if you require a support configuring a transformation policy.
Tip
For a tutorial and a deep dive into this topic, see How to Use Single Message Transforms in Kafka Connect.
Comments
Please sign in to leave a comment.