Baffle Data Protection Services (DPS) Transform for Kafka

 

Baffle Data Protection Services (DPS) Transform (single message transforms - SMTs) are applied to messages as they flow through Apache Kafka Connect. Baffle Data Protection Services (DPS) Transform (SMTs) transform inbound messages after a source connector has produced them, but before they are written to Kafka. Baffle Data Protection Services (DPS) Transform (SMTs) transform outbound messages before they are sent to a sink connector. The following SMTs are available for use with Kafka Connect.
 

BaffleSMT

The following provides usage information for (io.baffle.datapipeline.kafka.transforms.BaffleSMT)

Description

BaffleSMT transforms the value of the Kafka record as per the passed transformation policy in a config file. This SMT can be used by majority of the connectors unless they change Kafka data schema. In that case Baffle provided a specific SMT for connector.

Use the concrete transformation type designed for the record value (io.baffle.datapipeline.kafka.transforms.BaffleSMT$Value)

Example

BaffleSMT can transform one or more fields of a complex (Struct or Map) value.

"transforms":"encryptVal",
"transforms.encryptVal.type":"io.baffle.datapipeline.kafka.transforms.BaffleSMT$Value",
"transforms.encryptVal.configFilePath":"/connectors/config.toml"

Before:

{
 "id": "1000",
 "type": { "id" : "10001", "name" : "donut"},
 "name" : "Boston Creme",
 "pricePerUnit": 0.75,
 "topping":
    [
    { "id": "5001", "type": "None"},
    { "id": "5002", "type": "Sprinkles"}
    ]
}

Policy:

The Kafka Record in "donut.inventory" topic needs to transformed with following policy. The Policy is transforming the name value in nested json structure.

[Policy]
  version = "1.0"
[[Rules]]
  match = ".*donut.inventory.*"
  defaultAction = "fpe-encrypt"
  defaultKeyId = 2
[[Rules.Transforms]]
  fields = ["name", "type.name", "item.topping.type"]

More detail on policy can be found below or on the documentation folder of Baffle Transformation package.

After:

{
 "id": "1000",
 "type": { "id" : "10001", "name" : "rdcgo"},
 "name" : "Tbhludg Jigar",
 "pricePerUnit": 0.75,
 "topping":
   [
    { "id": "5001", "type": "Slkd"},
    { "id": "5002", "type": "Ifibmuwaw" }
   ]
}

BaffleDebeziumSMT

The following provides usage information for (io.baffle.datapipeline.kafka.transforms.BaffleDebeziumSMT)

Description

BaffleDebeziumSMT transforms the value of the Kafka record as per the passed transformation policy in a config file. This SMT is used Specific to Debezium connector.

Use the concrete transformation type designed for the record value (io.baffle.datapipeline.kafka.transforms.BaffleDebeziumSMT$Value)

Example

BaffleDebeziumSMT can transform one or more fields of a complex (Struct or Map) value.

"transforms":"encryptVal",
"transforms.encryptVal.type":"io.baffle.datapipeline.kafka.transforms.BaffleDebeziumSMT$Value",
"transforms.encryptVal.configFilePath":"/connectors/config.toml"

Before:

{
 "payload": {
   "before": {},
   "after": {
     "id": "1000",
     "type": { "id" : "10001", "name" :"donut"},
     "name" : Boston Creme",
     "pricePerUnit": 0.75,
     "topping":
     [
       { "id": "5001", "type": "None"},
       { "id": "5002", "type": "Sprinkles"}
     ]
   }
 }
}

Policy:

The Kafka Record in "donut.inventory" topic needs to transformed with following policy (config.toml). The Policy is transforming the name value in nested json structure.

[Policy]
  version = "1.0"
[[Rules]]
  match = ".*donut.inventory.*"
  defaultAction = "fpe-encrypt"
  defaultKeyId = 2
[[Rules.Transforms]]
  fields = ["name", "type.name", "item.topping.type"]

More detail on policy can be found below or on the documentation folder of Baffle Transformation package.

After:

{
 "payload": {
   "before": {},
   "after": {
     "id": "1000",
     "type": { "id" : "10001", "name" : "rdcgo"},
     "name" : "Tbhludg Jigar",
     "pricePerUnit": 0.75,
     "topping":
     [
       { "id": "5001", "type": "Slkd"},
       { "id": "5002", "type": "Ifibmuwaw"}
     ]
   }
 }
}

Transformation Policy

Below is the documentation on Transformation policy

[Policy]
 version = "1.0" // required


[[Rules]]
  match = {{topic regex}} // required: regex for topic/stream to match to
  defaultAction = "fpe-encrypt" // optional: default action, valid action are fpe-encrypt, fpe-decrypt, fph-encrypt, ctr-encrypt, ctr-decrypt
  defaultKeyId = 2 // optional: default key id for rules

[[Rules.Transforms]]
  fields = [ "id", "id.level1.level2", ...] // required fields to act upon, dot notation is supported
  KeyId = 12 // optional: either entire rule level (defaultKeyId) or individual rule level is required
  dataType = "integer" // optional : if not provided data type will be determined, valid types are string, integer, long, double, float, bigint, bigdecimal, localdate, localtime, and localdatetime
  action = "ctr-decrypt" // optional: either entire rule level (defaultAction) or individual rule level is required
  characterFormat = "cc" | "email" | "digit" // optional: ALPHANUM is by default only required for String type
  pos = [start, stop] // optional: start stop param.

[[Rules.Transforms]]
  fields = [ set of matching fields ]
  transformations...

[[Rules]]
  match = {{another topic match}}

Please submit a request on the Baffle Support Portal, if you require a support configuring a transformation policy.

Tip

For a tutorial and a deep dive into this topic, see How to Use Single Message Transforms in Kafka Connect.

Was this article helpful?
0 out of 0 found this helpful

Comments

0 comments

Please sign in to leave a comment.