Step 3: parse CSV
Description
The binary data in ByteString
s is passed into Alpakka CSV to be parsed and converted per line into a Map. The stream elements becomes a Map[String, ByteString]
Map<String, ByteString>
, one entry per column using the column headers as keys.
Code
- Java
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples.javadsl; import akka.Done; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.http.javadsl.Http; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.model.MediaRanges; import akka.http.javadsl.model.StatusCodes; import akka.http.javadsl.model.headers.Accept; import akka.stream.alpakka.csv.javadsl.CsvParsing; import akka.stream.alpakka.csv.javadsl.CsvToMap; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.concurrent.CompletionStage; import static akka.actor.typed.javadsl.Adapter.toClassic; public class Main { public static void main(String[] args) throws Exception { Main me = new Main(); me.run(); } final HttpRequest httpRequest = HttpRequest.create( "https://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download") .withHeaders(Collections.singletonList(Accept.create(MediaRanges.ALL_TEXT))); private Source<ByteString, ?> extractEntityData(HttpResponse httpResponse) { if (httpResponse.status() == StatusCodes.OK) { return httpResponse.entity().getDataBytes(); } else { return Source.failed(new RuntimeException("illegal response " + httpResponse)); } } private void run() throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); Http http = Http.get(toClassic(system)); CompletionStage<Done> completion = Source.single(httpRequest) // : HttpRequest .mapAsync(1, http::singleRequest) // : HttpResponse .flatMapConcat(this::extractEntityData) // : ByteString .via(CsvParsing.lineScanner()) // : List<ByteString> .via(CsvToMap.toMapAsStrings(StandardCharsets.UTF_8)) // : Map<String, String> .runWith(Sink.foreach(map -> map.entrySet().stream().forEach(System.out::println)), system); completion .thenAccept( done -> { System.out.println("Done!"); system.terminate(); }); } }
- Scala
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.http.scaladsl._ import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.headers.Accept import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } import akka.stream.alpakka.csv.scaladsl.{ CsvParsing, CsvToMap } import akka.stream.scaladsl.{ Sink, Source } import akka.util.ByteString import scala.concurrent.Future object Main extends App { implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") import actorSystem.executionContext val httpRequest = HttpRequest(uri = "https://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download") .withHeaders(Accept(MediaRanges.`text/*`)) def extractEntityData(response: HttpResponse): Source[ByteString, _] = response match { case HttpResponse(OK, _, entity, _) => entity.dataBytes case notOkResponse => Source.failed(new RuntimeException(s"illegal response $notOkResponse")) } val future: Future[Done] = Source .single(httpRequest) //: HttpRequest .mapAsync(1)(Http()(actorSystem.toClassic).singleRequest(_)) //: HttpResponse .flatMapConcat(extractEntityData) //: ByteString .via(CsvParsing.lineScanner()) //: List[ByteString] .via(CsvToMap.toMapAsStrings()) //: Map[String, ByteString] .runWith(Sink.foreach(println)) future.onComplete { _ => println("Done!") actorSystem.terminate() } }