Step 4: Producing JSON
Description
The helper method toJson
turns the maps into JSON by using Spray JSONJackson.
The JSON structure is converted to String
s via compactPrint
.
Code
- Java
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples.javadsl; import akka.Done; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.http.javadsl.Http; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.model.MediaRanges; import akka.http.javadsl.model.StatusCodes; import akka.http.javadsl.model.headers.Accept; import akka.stream.alpakka.csv.javadsl.CsvParsing; import akka.stream.alpakka.csv.javadsl.CsvToMap; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import java.io.IOException; import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletionStage; import static akka.actor.typed.javadsl.Adapter.toClassic; public class Main { public static void main(String[] args) throws Exception { Main me = new Main(); me.run(); } final HttpRequest httpRequest = HttpRequest.create( "https://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download") .withHeaders(Collections.singletonList(Accept.create(MediaRanges.ALL_TEXT))); private Source<ByteString, ?> extractEntityData(HttpResponse httpResponse) { if (httpResponse.status() == StatusCodes.OK) { return httpResponse.entity().getDataBytes(); } else { return Source.failed(new RuntimeException("illegal response " + httpResponse)); } } private final JsonFactory jsonFactory = new JsonFactory(); private String toJson(Map<String, String> map) throws Exception { StringWriter sw = new StringWriter(); JsonGenerator generator = jsonFactory.createGenerator(sw); generator.writeStartObject(); map.forEach( (key, value) -> { try { generator.writeStringField(key, value); } catch (IOException e) { throw new RuntimeException(e); } }); generator.writeEndObject(); generator.close(); return sw.toString(); } private void run() throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); Http http = Http.get(toClassic(system)); CompletionStage<Done> completion = Source.single(httpRequest) // : HttpRequest .mapAsync(1, http::singleRequest) // : HttpResponse .flatMapConcat(this::extractEntityData) // : ByteString .via(CsvParsing.lineScanner()) // : List<ByteString> .via(CsvToMap.toMapAsStrings(StandardCharsets.UTF_8)) // : Map<String, String> .map(this::toJson) // : String .runWith(Sink.foreach(System.out::println), system); completion .thenAccept( done -> { System.out.println("Done!"); system.terminate(); }); } }
- Scala
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.http.scaladsl._ import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.headers.Accept import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } import akka.stream.alpakka.csv.scaladsl.{ CsvParsing, CsvToMap } import akka.stream.scaladsl.{ Sink, Source } import akka.util.ByteString import spray.json.{ DefaultJsonProtocol, JsValue, JsonWriter } import scala.concurrent.Future object Main extends App with DefaultJsonProtocol { implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") import actorSystem.executionContext val httpRequest = HttpRequest(uri = "https://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download") .withHeaders(Accept(MediaRanges.`text/*`)) def extractEntityData(response: HttpResponse): Source[ByteString, _] = response match { case HttpResponse(OK, _, entity, _) => entity.dataBytes case notOkResponse => Source.failed(new RuntimeException(s"illegal response $notOkResponse")) } def toJson(map: Map[String, String])( implicit jsWriter: JsonWriter[Map[String, String]]): JsValue = jsWriter.write(map) val future: Future[Done] = Source .single(httpRequest) //: HttpRequest .mapAsync(1)(Http()(actorSystem.toClassic).singleRequest(_)) //: HttpResponse .flatMapConcat(extractEntityData) //: ByteString .via(CsvParsing.lineScanner()) //: List[ByteString] .via(CsvToMap.toMapAsStrings()) //: Map[String, String] .map(toJson) //: JsValue .map(_.compactPrint) //: String (JSON formatted) .runWith(Sink.foreach(println)) future.onComplete { _ => println("Done!") actorSystem.terminate() } }