Skip to content

Question: Parsing CSV-File without starving CPU #698

@sven42

Description

@sven42

Hello,

I am parsing a quite big CSV-File and get a warning from cats-effect about starving my CPU.

Here is the code I use:

import blobstore.url.Url
import cats.effect.IO
import fs2.{text, Pipe, Stream}
import fs2.compression.Compression
import fs2.io.file.{Files, Path}
import fs2.data.csv.*

trait Csv[T]  {

  val decode: Pipe[IO, String, T]
  val encode: Pipe[IO, T, String]

  def streamFrom(path: Path): Stream[IO, T] = decoded(load(path))

  def decoded(stream: Stream[IO, Byte]): Stream[IO, T] = stream.through(text.utf8.decode).through(decode)

  def streamTo(path: Path, data: Stream[IO, T]): IO[Unit] =
    save(path, data.through(encode andThen fs2.text.utf8.encode))

  def load(path: Path): Stream[IO, Byte] = {
    val byteStream = Files[IO].readAll(path)
    if (path.toString.endsWith(".gz")) byteStream.through(gunzip) else byteStream
  }

  val gunzip: Pipe[IO, Byte, Byte] = _.through(Compression[IO].gunzip()).flatMap(_.content)
}

object Csv {
  def apply[T](
      csvSeparator: Char = ';'
  )(implicit rowDecoder: CsvRowDecoder[T, String], rowEncoder: CsvRowEncoder[T, String]): Csv[T] =
    new Csv[T] {
      override val decode: Pipe[IO, String, T] = decodeUsingHeaders[T](csvSeparator)

      override val encode: Pipe[IO, T, String] = encodeUsingFirstHeaders(fullRows = true, csvSeparator)

    }
}

  val historicalDataCsv: Csv[HistoricalData] =
    Csv.apply()(deriveCsvRowDecoder[HistoricalData], deriveCsvRowEncoder[HistoricalData])

  def load(path: Url.Plain): IO[Map[String, HistoricalData]] =
    for {
      -          <- effectLogger.info(s"Loading catalog ${path.path.absolute} ...")
      catalogMap <- historicalDataCsv
                      .streamFrom(path)
                      .map(catalogData => catalogData.key -> catalogData)
                      .compile
                      .toList

      _ <- effectLogger.info(s"Loading catalog succeeded. Found ${catalogMap.size} entries")
    } yield catalogMap.toMap

I print out the thread name when a case class HistoricalData is created and in fact it uses e.g. "io-compute-4"

What can I do to avoid starving CPU?

I already tried to run it with IO.blocking(()).flatMap(_ => ...) without success.

Thanks in advance and best regards,
Sven

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions