Examples
See your project in action!
This is a placeholder page that shows you how to use this template site.
Do you have any example applications or code for your users in your repo or elsewhere? Link to your examples here.
fun main() {
stream {
val postgresConfig = postgresSourceConfig("mypostgres", "postgres", 5432, "postgres", "mysecretpassword", "dvdrental")
val mongoConfig = mongoConfig("mongosink", "mongodb://mongo", "@mongodump")
// Start with the 'film' collection
postgresSource("public", "film", postgresConfig) {
// Clear the last_update field, it makes no sense in a denormalized situation
set { _, film, _ ->
film["last_update"] = null; film
}
// Join with something that also uses the film_id key space.
// optional = true so films without any actors (animation?) will propagate
// multiple = true we're not joining with something that actually 'has' a film id
// we are joining with something that is grouped by film_id
joinGrouped(optional = true) {
postgresSource("public", "film_actor", postgresConfig) {
joinRemote({ msg -> "${msg["actor_id"]}" }, false) {
postgresSource("public", "actor", postgresConfig) {
}
}
// copy the first_name, last_name and actor_id to the film_actor message, drop the last update
set { _, actor_film, actor ->
actor_film["last_name"] = actor["last_name"]
actor_film["first_name"] = actor["first_name"]
actor_film["actor_id"] = actor["actor_id"]
actor_film["last_update"] = null
actor_film
}
// group the film_actor stream by film_id
// note the string wrapping: A key always needs to be a string
group { msg -> "${msg["film_id"]}" }
}
}
// ugly hack: As lists of messages can't be top level, a grouped message always consist of a single, otherwise empty message, that only
// contains one field, which is a list of the grouped messages, and that field is always named 'list'
// Ideas welcome
set { _, film, actorlist ->
film["actors"] = actorlist["list"] ?: emptyList<IMessage>()
film
}
// Now we're done with joining the actors. Let's join the categories, same drill, but simpler:
joinGrouped {
postgresSource("public", "film_category", postgresConfig) {
joinRemote({ msg -> "${msg["category_id"]}" }, true) {
postgresSource("public", "category", postgresConfig) {}
}
set { _, msg, category ->
msg["category"] = category["name"] ?: "unknown"
msg
}
group { msg -> "${msg["film_id"]}" }
}
}
set { _, msg, categories ->
msg["categories"] = categories["list"] ?: empty()
msg
}
joinRemote({ msg -> msg["language_id"].toString() }) {
postgresSource("public", "language", postgresConfig) {}
}
set { _, film, language ->
film["language"] = language["name"];
film["language_id"] = null
film
}
// pass this message to the mongo sink
mongoSink("filmwithactors", "filmwithcat", mongoConfig)
}
}.renderAndStart(URL("http://localhost:8083/connectors"), "localhost:9092")
logger.info { "done!" }
}
Last modified May 13, 2020: moving stuff around (7b54cd6)