Step 2: extract HTTP entity
Description
The HTTP response with status OK is expected and the contained HTTP entity is extracted. Instead of the HTTP response, the contained entity (page content) continues in the stream in the form of ByteString
elements.
Code
- Java
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples.javadsl; import akka.Done; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.http.javadsl.Http; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.model.MediaRanges; import akka.http.javadsl.model.StatusCodes; import akka.http.javadsl.model.headers.Accept; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import java.util.Collections; import java.util.concurrent.CompletionStage; import static akka.actor.typed.javadsl.Adapter.toClassic; public class Main { public static void main(String[] args) throws Exception { Main me = new Main(); me.run(); } final HttpRequest httpRequest = HttpRequest.create( "https://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download") .withHeaders(Collections.singletonList(Accept.create(MediaRanges.ALL_TEXT))); private Source<ByteString, ?> extractEntityData(HttpResponse httpResponse) { if (httpResponse.status() == StatusCodes.OK) { return httpResponse.entity().getDataBytes(); } else { return Source.failed(new RuntimeException("illegal response " + httpResponse)); } } private void run() throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); Http http = Http.get(toClassic(system)); CompletionStage<Done> completion = Source.single(httpRequest) // : HttpRequest .mapAsync(1, http::singleRequest) // : HttpResponse .flatMapConcat(this::extractEntityData) // : ByteString .runWith(Sink.foreach(bs -> System.out.println(bs.utf8String())), system); completion .thenAccept( done -> { System.out.println("Done!"); system.terminate(); }); } }
- Scala
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.http.scaladsl._ import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.headers.Accept import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } import akka.stream.scaladsl.{ Sink, Source } import akka.util.ByteString import scala.concurrent.Future object Main extends App { implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") import actorSystem.executionContext val httpRequest = HttpRequest(uri = "https://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download") .withHeaders(Accept(MediaRanges.`text/*`)) def extractEntityData(response: HttpResponse): Source[ByteString, _] = response match { case HttpResponse(OK, _, entity, _) => entity.dataBytes case notOkResponse => Source.failed(new RuntimeException(s"illegal response $notOkResponse")) } val future: Future[Done] = Source .single(httpRequest) //: HttpRequest .mapAsync(1)(Http()(actorSystem.toClassic).singleRequest(_)) //: HttpResponse .flatMapConcat(extractEntityData) //: ByteString .runWith(Sink.foreach(println)) future.onComplete { _ => println("Done!") actorSystem.terminate() } }