Skip to content

Instantly share code, notes, and snippets.

@satabin
Last active August 12, 2021 08:58
Show Gist options
  • Select an option

  • Save satabin/577c811e7424b516e9d6ff3fa59d8941 to your computer and use it in GitHub Desktop.

Select an option

Save satabin/577c811e7424b516e9d6ff3fa59d8941 to your computer and use it in GitHub Desktop.
Rule Engine Part 2, Post snippets
import fs2._
import cats.syntax.all._
import fs2.data.csv._
def parseCSV[F[_]: RaiseThrowable](
content: Stream[F, Char]): Stream[F, CsvRow[String]] =
content.through(decodeUsingHeaders())
parseCSV[Fallible](Stream.emits(“a,b,c\n1,2,3”)).compile.toList
// Right(List(RowF(NonEmptyList(1, 2, 3),Some(NonEmptyList(a, b, c)),None)))
case class Rule(
name: String,
matcher: RowMatcher,
transformations: List[RowTransformation]
)
enum RowMatcher {
case FieldIs(field: String, value: String) extends RowMatcher
case FieldMatches(field: String, regex: String) extends RowMatcher
case And(left: RowMatcher, right: RowMatcher) extends RowMatcher
case Or(left: RowMatcher, right: RowMatcher) extends RowMatcher
case Not(inner: RowMatcher) extends RowMatcher
}
enum RowTransformation {
case Delete extends RowTransformation
case Replace(field: String, by: String) extends RowTransformation
case SearchAndReplace(field: String, regex: String, repl: String)
extends RowTransformation
}
// rule remove_berlin {
// when field stop_name matches /.* (Berlin)$/
// do search / (Berlin)$/ in field stop_name and replace with ""
// }
// rule remove_old_station {
// when field stop_id == "old-id"
// do delete
// }
val stopRules = List(
Rule(
"remove_old_station",
RowMatcher.FieldIs("stop_id", "stop3"),
List(RowTransformation.Delete)
),
Rule(
"remove_berlin",
RowMatcher.FieldMatches("stop_name", """.* \(Berlin\)$"""),
List(RowTransformation.SearchAndReplace("stop_name", """ \(Berlin\)""", ""))
)
)
// rule ubahn_type {
// when field route_short_name matches /U\d+/ and field route_type == "400"
// do replace field route_type by "402"
// }
val routeRules = List(
Rule(
"ubahn_type",
RowMatcher.And(
RowMatcher.FieldMatches("route_short_name", """U\d+"""),
RowMatcher.FieldIs("route_type", "400")
),
List(RowTransformation.Replace("route_type", "402"))
)
)
def matches(row: CsvRow[String], matcher: RowMatcher): Boolean =
matcher match {
case RowMatcher.FieldIs(field, value) =>
row(field).exists(_ == value)
case RowMatcher.FieldMatches(field, regex) =>
row(field).exists(_.matches(regex))
case RowMatcher.And(left, right) =>
matches(row, left) && matches(row, right)
case RowMatcher.Or(left, right) =>
matches(row, left) || matches(row, right)
case RowMatcher.Not(inner) =>
!matches(row, inner)
}
def applyTransformation(row: CsvRow[String], transformation: RowTransformation): Option[CsvRow[String]] =
transformation match {
case RowTransformation.Delete =>
none
case RowTransformation.Replace(field, by) =>
row.updated(field, by).some
case RowTransformation.SearchAndReplace(field, regex, repl) =>
row.modify(field)(_.replaceAll(regex, repl)).some
}
def transformRow(row: CsvRow[String], rules: List[Rule]): Option[CsvRow[String]] =
rules
.find(rule => matches(row, rule.matcher))
.fold(row.some)(rule =>
rule.transformations.foldLeft(row.some)((row, t) =>
row.flatMap(applyTransformation(_, t))
)
)
def transform(rows: Stream[Fallible, CsvRow[String]], rules: List[Rule]): Stream[Fallible, CsvRow[String]] =
rows
.map(transformRow(_, rules))
.unNone
def process(csv: String, rules: List[Rule]): Either[Throwable, String] =
parseCSV[Fallible](Stream.emits(csv))
.through(transform(_, rules))
.through(encodeUsingFirstHeaders())
.compile
.foldMonoid
val stops = """stop_id,stop_name
|stop1,S+U Alexanderplatz (Berlin)
|stop2,U Möckernbrücke
|stop3,U Französische Straße
|stop4,U Turmstraße""".stripMargin
val routes = """route_id,route_short_name,route_type,route_color,route_text_color
|route1,U1,400,,
|route2,M27,700,a41e78,ffffff""".stripMargin
process(stops, stopRules)
// Right(stop_id,stop_name
// stop1,S+U Alexanderplatz
// stop2,U Möckernbrücke
// stop4,U Turmstraße
// )
process(routes, routeRules)
// Right(route_id,route_short_name,route_type,route_color,route_text_color
// route1,U1,402,,
// route2,M27,700,a41e78,ffffff
// )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment