Mito Big Data Platform Architecture Practice
This article is the 11th session of Meitu Internet Technology Salon.
Nowadays, big data is increasingly used in various industries: operations based on data to focus on operational effectiveness, products based on data analysis to focus on conversion rate, development based on data to measure the effect of system optimization, etc. The company has more than a dozen apps such as Meitu, Meitu Xiuxiu, and Meifan Camera, each of which does personalized recommendation, search, report analysis, anti-cheat, and advertising based on data, so the overall business demand for data is relatively large and widely applied.
Therefore the business background of the Meitu Data Technology team is mainly reflected in the many lines of business as well as the relatively wide range of applications. This is one of the primary reasons that prompted us to build the data platform that Driven by business。
Figure 1
Give a few examples of data applications from Meitu. As shown in Figure 1, the first one from the left is DataFace, Meitu's self-developed data visualization platform, which supports the business side to drag and drop freely to generate visual reports for efficient data reporting and subsequent analysis; the second one is the home page of Meitai APP, with popular personalized recommendation, based on the behavioral data used to recommend the list of videos that users may like and be interested in; the third one is based on the data of users' cheating, which can effectively judge and filter the cheating behavior of users according to certain models and strategies for anti-cheating. In addition to this, there are a wide range of applications including search, a/b experiments, channel tracking, advertising, etc.
Currently, Meitu has 500 million active users per month, and these users generate close to 20 billion behavioral data per day, so the overall volume is relatively large, with thousands of cluster machines, as well as PBs of total historical data.
With a relatively large number of business lines and the relatively extensive use of data in each line of business, as well as a relatively large overall user base, all of these factors have driven us to build corresponding data platforms to drive growth in these businesses and use data more efficiently.
/ Overall architecture of the Mito data platform /
The overall architecture of our data platform is shown in Figure 2. In this part of data collection, we build a collection server-side logging system Arachnia, which supports client-side SDK for each app integration and is responsible for collecting app client-side data; we also have data integration (import and export) based on DataX implementation; Mor crawler platform supports configurable task development for crawling public network data.
Figure 2
The data storage layer is mainly based on business characteristics to choose different storage solutions, Currently it is mainly useful for HDFS、MongoDB、Hbase、ES etc.。 In the data calculation section, Current offline computing is still largely based on Hive&MR、 Real-time stream computing is Storm 、 Flink and there is another self-researched bitmap system Naix。
In the data development area we have built a data workshop、 Data bus distribution、 Platforms such as task scheduling。 The data visualization and application section focuses on building a series of data application platforms based on user requirements , including.A/B Experimental platform、 Channel promotion tracking platform、 Data Visualization Platform、 User profiling, etc.。
The right side of Figure 2 shows some of the underlying services that each component may rely on, including geolocation, metadata management, unique device identification, etc.
As shown in Figure 3 is the basic data architecture flow diagram, typical lamda architecture, starting from the left end of the data source collection, Arachnia, AppSDK respectively, server-side, client-side data reported to the proxy service collector, by parsing the data protocol, the data written to kafka, then the real-time stream will go through a layer of data distribution, the final business consumption kafka data for real-time computing.
Figure 3
Offline, the ETL service is responsible for dumping data from Kafka to HDFS, then heterogeneous data sources (such as MySQL, Hbase, etc.) are mainly based on DataX and Sqoop for data import and export, and finally the data is written to various storage layers through hive, kylin, spark, etc. Finally, the unified external API is used to interface with business systems and our own visualization platform, etc.
/ Stages of development of data platforms /
There are three main phases in the construction of an enterprise-level data platform.
Meitu is now in the transition period between the second and third stages, and is constantly improving the openness of its data while also gradually improving the efficiency of query analysis, as well as beginning to consider how to optimize costs. The next section will focus on the practice and optimization of our platform during the 0 to 1 and open data phases.
From 0 to 1
From 0 to 1 Addresses data from the time it is collected to the time it is finally available for use. Figure 4 shows the evolution of data collection, from using free third-party platforms like umeng and flurry at the beginning, to quickly using rsync to synchronize logs to a server for storage and computation, to quickly developing a simple python script to support the business server to report logs, and finally we developed the server-side log collection system Arachnia and the client-side AppSDK.
Figure 4
Data collection is the source of data and is a relatively important link in the entire data chain, requiring more attention to: whether the data is complete, whether the data supports real-time reporting, whether the data burial points are standardized and accurate, and the cost of maintenance management. Our log capture system therefore needs to meet the following requirements.
Based on the above requirements we did not use flume, scribe, fluentd and finally chose to develop a collection system Arachnia by ourselves.
Figure 5
Figure 5 shows a simple architecture of Arachnia, which is centrally managed through the system brain. The puppet module primarily serves as a unified aggregation of Agent metrics within a single IDC, relaying forwarded metrics or configuring hot-change commands. The Collector Agent is primarily the Ops platform responsible for installing, starting and then pulling in the configuration from the brain and starting to collect the reported data to the collector.
Moving on to Arachnia's practical optimizations, the first is the at least once reliability guarantee. Quite a few systems use to log the data that fails to report by WAL, retry and report again to avoid losing the report failure. Our practice is to remove the WAL and add a coordinator to manage tx state in a unified distribution.
Figure 6
The txid is sent from the coordinator before starting the acquisition. source receives the signal and starts the acquisition, and hands it to sink to send the data. after sending it, it will ack tx and tell the coordinator that it has committed. The coordinator will check and confirm, then send a commit signal to source, sink to update the status, and finally tx finish source will update the acquisition progress to the persistence layer (default is local file). This way if there is a problem in the first 3 steps, the data is not sent successfully and will not be repeated; if the next 4 steps fail, the data will be repeated and that tx will be replayed.
Based on the at least once reliability guarantee above, some business parties require uniqueness, and we support generating unique ID identifiers for each log on our side. Another key practice of the data collection system is to uniquely locate a file and give each log a unique MsgID, so that the business side can clean up after the logs based on the MsgID when duplication occurs.
We started out using filename, Found out later filename Many business parties will change, So read inode, nevertheless inode linux Will recycle and reuse, It ends with inode & The content of the file header does hash serve asfileID。 but (not) MsgID It is through agentID & fileID & offset to uniquely confirm。
After the data is reported, the collector is responsible for parsing the protocol and pushing the data to Kafka, so how does Kafka land on HDFS? First look at Meitu's claims.
Based on the pain points of the above requirements, the data service implementation of Mito from Kafka to HDFS is shown in Figure 7.
Figure 7
Based on the features of Kafka and MR, for each partition of a kafka topic, a mapper inputsplit is assembled, and then a mapper process is started to handle the consumption of this batch of kafka data, after data parsing, business logic processing, checksum filtering, and finally written to the target HDFS file according to the partition rules. The meta information (including topic, partition, start offset, end offset) of this processing is stored to MySQL after a successful landing. The next time it is processed, the message will be read from the offset of the end of the last processing, starting a new batch of data consumption to land.
After implementing the basic functionality, it is inevitable to encounter some problems, such as the data level of different business topics is not the same, which will lead to a task need to wait for the partition with the most data and the longest processing time mapper to finish, in order to end the whole task. So how do we solve this problem? One of the unwritten principles of system design is that divide and conquer, and we use a similar idea for the data skewing problem.
Figure 8
The first partitions with smaller data levels are merged into one inputsplit, so that one mapper can handle the data of multiple business partitions and eventually land up writing multiple files.
Figure 9
In addition, it supports segmentation and splitting for partitions with large data levels, and divides them equally into multiple mappers to handle the same partition, thus achieving more balanced mapper processing and better coping with the sudden increase in business volume.
In addition to the data skew issue, there are various reasons why the data dump to HDFS fails, such as kafka disk issues, hadoop cluster node downtime, network failure, external access rights, etc., which may eventually lead to file corruption due to unclosed HDFS files, etc., requiring a rerun of the data. Then our data time partitioning is basically in days, and using the original way may result in a day granularity file that is corrupted and unreadable for parsing.
Figure 10
We use a two-stage processing: mapper 1 first write the data to a temporary directory, mapper 2 append the data from the temporary directory of Hdfs to the target file. This allows the batch to be rerun directly when mapper1 fails, instead of rerunning the entire day's data; when mapper2 fails, the final file can be replaced by merge data directly from the temporary directory, reducing the process of re-ETL day granularity.
The data written to kafka1 is basically the full amount of data for each business, but for the demand side most businesses only focus on a certain event, a small category of data, and not any business consumes the full amount of data for processing, so we added a real-time distribution Databus to solve this problem.
Figure 11
Databus supports business-side custom distribution rules for writing data to downstream kafka clusters, making it easy for business-side subscriptions to process the data they want and supporting smaller granularity of data reuse.
Figure 12
Figure 12 shows how Databus is implemented, with its main body implementing a databus topology based on Storm. Databus has two spouts, one that supports pulling the full amount as well as added rules and then updating them to a downstream distribution bolt to update the cache rules, and another spout that is consumed from kafka. Distributionbolt is primarily responsible for parsing data, rule matching, and sending data downstream to the kafka cluster.
Open Data
With raw data and the ability to do offline and real-time data development came a spurt in demand for data development, and data development teams were overwhelmed. So we open up data computing and storage capabilities by way of a data platform to empower the business side to have data development capabilities.
We will not elaborate on the implementation of metadata management, task scheduling, data integration, DAG task scheduling, visualization, etc., but mainly introduce the practical experience of Mito on stability after data opening.
Data openness and system stability are in love with each other: on the one hand, after the openness is no longer the R&D staff with data base to do, they often encounter data tasks submitted to illegal, high resource consumption and other problems, causing relatively big problems to the stability of the underlying computing and storage clusters; on the other hand, in fact, it is also because of data openness that we keep pushing that we must improve system stability.
For quite a few high-resource, illegal tasks, we first consider whether we can There are checks that can be done at the HiveSQL level、 constrain。 as if Figure 13 Shown are HiveSQL The entire parsing of the compiled executable MR initial stage of the process:
Figure 13
First based on Antlr to do the parsing of the syntax to generate AST, then do semantic parsing, based on AST will generate JAVA objects QueryBlock. After generating a logical plan based on QueryBlock and doing logical optimization, a physical plan is finally generated, physically optimized, and finally converted into an executable MR task.
We generate mainly in the semantic parsing phase QueryBlock back, Got it to do a lot of statement checking , including. illegal operation、 Query Criteria Restrictions、 High resource consumption check judgment, etc.。
The second practice in terms of stability focuses on the Optimization of clusters , including.
The last part of the practice in terms of platform stability is Increase in authority , security to prevent illegal access to clusters, data, attacks, etc. Raising permissions comes in two main pieces: API access and clustering.
Figure 14
These are some of the practices and optimizations that Meitu has made to the stability of its data platform after it has been built and opened to various business lines.
Then the next is a brief summary of the data platform building process.
(end)