المقدمة
لتحليل تدفق البيانات ، تحتاج إلى مصادر هذه البيانات. المعلومات التي توفرها المصادر مهمة أيضًا. ومصادر المعلومات النصية ، على سبيل المثال ، نادرة أيضًا.
تشمل المصادر المثيرة للاهتمام ما يلي: twitter ، vk . لكن هذه المصادر ليست مناسبة لجميع المهام.
هناك مصادر بالبيانات المطلوبة ، لكن هذه المصادر لا تتدفق. يمكن الاقتباس من الروابط التالية هنا: public-apis .
يمكنك استخدام الطريقة القديمة عند حل مشاكل تدفق البيانات.
تحميل البيانات وإرسالها إلى تيار.
على سبيل المثال ، يمكنك استخدام المصدر التالي: imdb .
وتجدر الإشارة إلى أن imdb يوفر البيانات من تلقاء نفسه. انظر IMDb Datasets . ولكن يمكن الافتراض أن البيانات التي تم جمعها تحتوي مباشرة على معلومات أكثر صلة.
اللغة: Java 1.8.
المكتبات: kafka 2.6.0، jsoup 1.13.1.
جمع البيانات
جمع البيانات هي خدمة تقوم ، وفقًا لبيانات الإدخال ، بتحميل صفحات html ، والبحث عن المعلومات الضرورية وتحويلها إلى مجموعة من الكائنات.
إذن مصدر البيانات: imdb . سيتم جمع المعلومات حول الأفلام وسيتم استخدام الطلب التالي: https://www.imdb.com/search/title/؟release_date=٪s،٪s&countries=٪s
حيث 1 ، 2 المعلمات هي التواريخ. المعلمة 3 - البلدان.
لفهم مصدر البيانات بشكل أفضل ، يمكنك الرجوع إلى المورد التالي: imdb-wide-dataset .
واجهة الخدمة:
public interface MovieDirectScrapingService {
Collection<Movie> scrap();
}
فئة الفيلم هي فئة تحتوي على معلومات حول فيلم واحد (أو عرض ، وما إلى ذلك).
class Movie {
public final String titleId;
public final String titleUrl;
public final String title;
public final String description;
public final Double rating;
public final String genres;
public final String runtime;
public final String baseUrl;
public final String baseNameUrl;
public final String baseTitleUrl;
public final String participantIds;
public final String participantNames;
public final String directorIds;
public final String directorNames;
…
تحليل البيانات في صفحة واحدة.
يتم جمع المعلومات بالطريقة التالية. يتم تحميل البيانات باستخدام jsoup. بعد ذلك ، يتم البحث عن عناصر html المطلوبة وتحويلها إلى نسخ فيلم.
String scrap(String url, List<Movie> items) {
Document doc = null;
try {
doc = Jsoup.connect(url).header("Accept-Language", language).get();
} catch (IOException e) {
e.printStackTrace();
}
if (doc != null) {
collectItems(doc, items);
return nextUrl(doc);
}
return "";
}
ابحث عن ارتباط للصفحة التالية.
String nextUrl(Document doc) {
Elements nextPageElements = doc.select(".next-page");
if (nextPageElements.size() > 0) {
Element hrefElement = nextPageElements.get(0);
return baseUrl + hrefElement.attributes().get("href");
}
return "";
}
. . . , . .
@Override
public Collection<Movie> scrap() {
String url = String.format(
baseUrl + "/search/title/?release_date=%s,%s&countries=%s",
startDate, endDate, countries
);
List<Movie> items = new ArrayList<>();
String nextUrl = url;
while (true) {
nextUrl = scrap(nextUrl, items);
if ("".equals(nextUrl)) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
return items;
}
.
: MovieProducer. : run.
. . .
public void run() {
try (SimpleStringStringProducer producer = new SimpleStringStringProducer(
bootstrapServers, clientId, topic)) {
Collection<Data.Movie> movies = movieDirectScrapingService.scrap();
List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>();
for (Data.Movie move : movies) {
Map<String, String> map = new HashMap<>();
map.put("title_id", move.titleId);
map.put("title_url", move.titleUrl);
…
String value = JSONObject.toJSONString(map);
String key = UUID.randomUUID().toString();
kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value));
}
producer.produce(kvList);
}
}
. .
: MovieDirectScrapingExecutor. : run.
. .
public void run() {
int countriesCounter = 0;
List<String> countriesSource = Arrays.asList("us");
while (true) {
try {
LocalDate localDate = LocalDate.now();
int year = localDate.getYear();
int month = localDate.getMonthValue();
int day = localDate.getDayOfMonth();
String monthString = month < 9 ? "0" + month : Integer.toString(month);
String dayString = day < 9 ? "0" + day : Integer.toString(day);
String startDate = year + "-" + monthString + "-" + dayString;
String endDate = startDate;
String language = "en";
String countries = countriesSource.get(countriesCounter);
execute(language, startDate, endDate, countries);
Thread.sleep(1000);
countriesCounter += 1;
if (countriesCounter >= countriesSource.size()) {
countriesCounter = 0;
}
} catch (InterruptedException e) {
}
}
}
MovieDirectScrapingExecutor, , , main.
.
{
"base_name_url": "https:\/\/www.imdb.com\/name",
"participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~",
"title_id": "tt13121702",
"rating": "0.0",
"base_url": "https:\/\/www.imdb.com",
"description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary »",
"runtime": "",
"title": "The Christmas Edition",
"director_ids": "nm0838289~",
"title_url": "\/title\/tt13121702\/?ref_=adv_li_tt",
"director_names": "Peter Sullivan~",
"genres": "Drama, Romance",
"base_title_url": "https:\/\/www.imdb.com\/title",
"participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~"
}
.
, , -. kafka-.
. Apache Kafka Kafka Server.
: MovieProducerTest.
public class MovieProducerTest {
@Test
void simple() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String zooKeeperHost = "127.0.0.1";
int zooKeeperPort = 22183;
String bootstrapServers = brokerHost + ":" + brokerPort;
String topic = "q-data";
String clientId = "simple";
try (KafkaServerService kafkaServerService = new KafkaServerService(
brokerHost, brokerPort, zooKeeperHost, zooKeeperPort
)
) {
kafkaServerService.start();
kafkaServerService.createTopic(topic);
MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton(
new Data.Movie(…)
);
MovieProducer movieProducer =
new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl);
movieProducer.run();
kafkaServerService.poll(topic, "simple", 1, 5, (records) -> {
assertTrue(records.count() > 0);
ConsumerRecord<String, String> record = records.iterator().next();
JSONParser jsonParser = new JSONParser();
JSONObject jsonObject = null;
try {
jsonObject = (JSONObject) jsonParser.parse(record.value());
} catch (ParseException e) {
e.printStackTrace();
}
assertNotNull(jsonObject);
…
});
Thread.sleep(5000);
}
}
}
, , . .