Mito Big Data Platform Architecture Practice

This article is the 11th session of Meitu Internet Technology Salon.

Nowadays, big data is increasingly used in various industries: operations based on data to focus on operational effectiveness, products based on data analysis to focus on conversion rate, development based on data to measure the effect of system optimization, etc. The company has more than a dozen apps such as Meitu, Meitu Xiuxiu, and Meifan Camera, each of which does personalized recommendation, search, report analysis, anti-cheat, and advertising based on data, so the overall business demand for data is relatively large and widely applied.

Therefore the business background of the Meitu Data Technology team is mainly reflected in the many lines of business as well as the relatively wide range of applications. This is one of the primary reasons that prompted us to build the data platform that Driven by business

Figure 1

Give a few examples of data applications from Meitu. As shown in Figure 1, the first one from the left is DataFace, Meitu's self-developed data visualization platform, which supports the business side to drag and drop freely to generate visual reports for efficient data reporting and subsequent analysis; the second one is the home page of Meitai APP, with popular personalized recommendation, based on the behavioral data used to recommend the list of videos that users may like and be interested in; the third one is based on the data of users' cheating, which can effectively judge and filter the cheating behavior of users according to certain models and strategies for anti-cheating. In addition to this, there are a wide range of applications including search, a/b experiments, channel tracking, advertising, etc.

Currently, Meitu has 500 million active users per month, and these users generate close to 20 billion behavioral data per day, so the overall volume is relatively large, with thousands of cluster machines, as well as PBs of total historical data.

With a relatively large number of business lines and the relatively extensive use of data in each line of business, as well as a relatively large overall user base, all of these factors have driven us to build corresponding data platforms to drive growth in these businesses and use data more efficiently.

/ Overall architecture of the Mito data platform /

The overall architecture of our data platform is shown in Figure 2. In this part of data collection, we build a collection server-side logging system Arachnia, which supports client-side SDK for each app integration and is responsible for collecting app client-side data; we also have data integration (import and export) based on DataX implementation; Mor crawler platform supports configurable task development for crawling public network data.

Figure 2

The data storage layer is mainly based on business characteristics to choose different storage solutions, Currently it is mainly useful for HDFS、MongoDB、Hbase、ES etc.。 In the data calculation section, Current offline computing is still largely based on Hive&MR、 Real-time stream computing is Storm 、 Flink and there is another self-researched bitmap system Naix。

In the data development area we have built a data workshop、 Data bus distribution、 Platforms such as task scheduling。 The data visualization and application section focuses on building a series of data application platforms based on user requirements , including.A/B Experimental platform、 Channel promotion tracking platform、 Data Visualization Platform、 User profiling, etc.。

The right side of Figure 2 shows some of the underlying services that each component may rely on, including geolocation, metadata management, unique device identification, etc.

As shown in Figure 3 is the basic data architecture flow diagram, typical lamda architecture, starting from the left end of the data source collection, Arachnia, AppSDK respectively, server-side, client-side data reported to the proxy service collector, by parsing the data protocol, the data written to kafka, then the real-time stream will go through a layer of data distribution, the final business consumption kafka data for real-time computing.

Figure 3

Offline, the ETL service is responsible for dumping data from Kafka to HDFS, then heterogeneous data sources (such as MySQL, Hbase, etc.) are mainly based on DataX and Sqoop for data import and export, and finally the data is written to various storage layers through hive, kylin, spark, etc. Finally, the unified external API is used to interface with business systems and our own visualization platform, etc.

/ Stages of development of data platforms /

There are three main phases in the construction of an enterprise-level data platform.

  • At first it was a basic use of free third-party platforms, and this phase was characterized by the ability to Fast Integration and see some of the app's statistical metrics, but the downside is also obvious. No raw data In addition to those basic indicators provided by third parties other analysis、 Recommendations, etc. are not available。 So there are From 0 to 1 initial stage of the process, Let's have our own data to work with;
  • After there is data available, because of the explosion of business lines, demand and the need to Improving development efficiencythat getting more people involved in data development and using it, rather than just limiting it to data developers, involves opening up data, computational storage capabilities to various lines of business, rather than holding it in their own hands.
  • at that Open Data after completion, The business side will ask if the data tasks can run faster, Can I get out in seconds?, Could it be more real time; on the other hand, To meet business needs cluster The increasing size of the, So will start thinking about meeting the business while, How to achieve greater resource savings。

Meitu is now in the transition period between the second and third stages, and is constantly improving the openness of its data while also gradually improving the efficiency of query analysis, as well as beginning to consider how to optimize costs. The next section will focus on the practice and optimization of our platform during the 0 to 1 and open data phases.

From 0 to 1

From 0 to 1 Addresses data from the time it is collected to the time it is finally available for use. Figure 4 shows the evolution of data collection, from using free third-party platforms like umeng and flurry at the beginning, to quickly using rsync to synchronize logs to a server for storage and computation, to quickly developing a simple python script to support the business server to report logs, and finally we developed the server-side log collection system Arachnia and the client-side AppSDK.

Figure 4

Data collection is the source of data and is a relatively important link in the entire data chain, requiring more attention to: whether the data is complete, whether the data supports real-time reporting, whether the data burial points are standardized and accurate, and the cost of maintenance management. Our log capture system therefore needs to meet the following requirements.

  • Ability to integrate management and maintenance, including automated deployment of Agent installation, upgrade and uninstallation, configuration hot-change, and monitoring in terms of latency.
  • The need to guarantee at least once in terms of reliability.
  • Mito now has multiple IDCs and needs to be able to support multiple IDC data collection aggregation to the data center.
  • As little as possible in terms of resource consumption and as little disruption to operations as possible.

Based on the above requirements we did not use flume, scribe, fluentd and finally chose to develop a collection system Arachnia by ourselves.

Figure 5

Figure 5 shows a simple architecture of Arachnia, which is centrally managed through the system brain. The puppet module primarily serves as a unified aggregation of Agent metrics within a single IDC, relaying forwarded metrics or configuring hot-change commands. The Collector Agent is primarily the Ops platform responsible for installing, starting and then pulling in the configuration from the brain and starting to collect the reported data to the collector.

Moving on to Arachnia's practical optimizations, the first is the at least once reliability guarantee. Quite a few systems use to log the data that fails to report by WAL, retry and report again to avoid losing the report failure. Our practice is to remove the WAL and add a coordinator to manage tx state in a unified distribution.

Figure 6

The txid is sent from the coordinator before starting the acquisition. source receives the signal and starts the acquisition, and hands it to sink to send the data. after sending it, it will ack tx and tell the coordinator that it has committed. The coordinator will check and confirm, then send a commit signal to source, sink to update the status, and finally tx finish source will update the acquisition progress to the persistence layer (default is local file). This way if there is a problem in the first 3 steps, the data is not sent successfully and will not be repeated; if the next 4 steps fail, the data will be repeated and that tx will be replayed.

Based on the at least once reliability guarantee above, some business parties require uniqueness, and we support generating unique ID identifiers for each log on our side. Another key practice of the data collection system is to uniquely locate a file and give each log a unique MsgID, so that the business side can clean up after the logs based on the MsgID when duplication occurs.

We started out using filename, Found out later filename Many business parties will change, So read inode, nevertheless inode linux Will recycle and reuse, It ends with inode & The content of the file header does hash serve asfileID。 but (not) MsgID It is through agentID & fileID & offset to uniquely confirm。

After the data is reported, the collector is responsible for parsing the protocol and pushing the data to Kafka, so how does Kafka land on HDFS? First look at Meitu's claims.

  • Support for distributed processing.
  • involves more lines of business and therefore has multiple data formats, so it needs to support serialization of multiple data formats, including json, avro, special delimiters, etc..
  • Support for data landing failure reruns due to machine failures, service issues, etc., and the need to be able to rerun relatively quickly, as once this piece fails, it affects the use of data across subsequent lines of business.
  • Support for configurable HDFS partitioning policies that can support relatively flexible and disparate partitioning configurations across lines of business.
  • Support some special business logic processing , including. Data Validation、 Expiry filter、 Test Data Filtering、 injections etc;

Based on the pain points of the above requirements, the data service implementation of Mito from Kafka to HDFS is shown in Figure 7.

Figure 7

Based on the features of Kafka and MR, for each partition of a kafka topic, a mapper inputsplit is assembled, and then a mapper process is started to handle the consumption of this batch of kafka data, after data parsing, business logic processing, checksum filtering, and finally written to the target HDFS file according to the partition rules. The meta information (including topic, partition, start offset, end offset) of this processing is stored to MySQL after a successful landing. The next time it is processed, the message will be read from the offset of the end of the last processing, starting a new batch of data consumption to land.

After implementing the basic functionality, it is inevitable to encounter some problems, such as the data level of different business topics is not the same, which will lead to a task need to wait for the partition with the most data and the longest processing time mapper to finish, in order to end the whole task. So how do we solve this problem? One of the unwritten principles of system design is that divide and conquer, and we use a similar idea for the data skewing problem.

Figure 8

The first partitions with smaller data levels are merged into one inputsplit, so that one mapper can handle the data of multiple business partitions and eventually land up writing multiple files.

Figure 9

In addition, it supports segmentation and splitting for partitions with large data levels, and divides them equally into multiple mappers to handle the same partition, thus achieving more balanced mapper processing and better coping with the sudden increase in business volume.

In addition to the data skew issue, there are various reasons why the data dump to HDFS fails, such as kafka disk issues, hadoop cluster node downtime, network failure, external access rights, etc., which may eventually lead to file corruption due to unclosed HDFS files, etc., requiring a rerun of the data. Then our data time partitioning is basically in days, and using the original way may result in a day granularity file that is corrupted and unreadable for parsing.

Figure 10

We use a two-stage processing: mapper 1 first write the data to a temporary directory, mapper 2 append the data from the temporary directory of Hdfs to the target file. This allows the batch to be rerun directly when mapper1 fails, instead of rerunning the entire day's data; when mapper2 fails, the final file can be replaced by merge data directly from the temporary directory, reducing the process of re-ETL day granularity.

The data written to kafka1 is basically the full amount of data for each business, but for the demand side most businesses only focus on a certain event, a small category of data, and not any business consumes the full amount of data for processing, so we added a real-time distribution Databus to solve this problem.

Figure 11

Databus supports business-side custom distribution rules for writing data to downstream kafka clusters, making it easy for business-side subscriptions to process the data they want and supporting smaller granularity of data reuse.

Figure 12

Figure 12 shows how Databus is implemented, with its main body implementing a databus topology based on Storm. Databus has two spouts, one that supports pulling the full amount as well as added rules and then updating them to a downstream distribution bolt to update the cache rules, and another spout that is consumed from kafka. Distributionbolt is primarily responsible for parsing data, rule matching, and sending data downstream to the kafka cluster.

Open Data

With raw data and the ability to do offline and real-time data development came a spurt in demand for data development, and data development teams were overwhelmed. So we open up data computing and storage capabilities by way of a data platform to empower the business side to have data development capabilities.

We will not elaborate on the implementation of metadata management, task scheduling, data integration, DAG task scheduling, visualization, etc., but mainly introduce the practical experience of Mito on stability after data opening.

Data openness and system stability are in love with each other: on the one hand, after the openness is no longer the R&D staff with data base to do, they often encounter data tasks submitted to illegal, high resource consumption and other problems, causing relatively big problems to the stability of the underlying computing and storage clusters; on the other hand, in fact, it is also because of data openness that we keep pushing that we must improve system stability.

For quite a few high-resource, illegal tasks, we first consider whether we can There are checks that can be done at the HiveSQL level、 constrain。 as if Figure 13 Shown are HiveSQL The entire parsing of the compiled executable MR initial stage of the process:

Figure 13

First based on Antlr to do the parsing of the syntax to generate AST, then do semantic parsing, based on AST will generate JAVA objects QueryBlock. After generating a logical plan based on QueryBlock and doing logical optimization, a physical plan is finally generated, physically optimized, and finally converted into an executable MR task.

We generate mainly in the semantic parsing phase QueryBlock back, Got it to do a lot of statement checking , including. illegal operation、 Query Criteria Restrictions、 High resource consumption check judgment, etc.。

The second practice in terms of stability focuses on the Optimization of clusters , including.

  • We did a complete upgrade of the Hive, Hadoop cluster. The main reason is that there were some fixes in the lower version and some community patches were merged, which were fixed in the later version; the other reason is the new version features and performance optimizations. We upgrade Hive from version 0.13 to 2.1 and Hadoop from 2.4 to 2.7.
  • HA deployment optimizations have been made for Hive. We split the HiveServer and MetaStoreServer to deploy multiple nodes separately to avoid merging them to run in one service deployment affecting each other.
  • The previous execution engine was basically On MapReduce, and we are also doing a Hive On Spark migration, gradually switching online tasks from Hive On MR to Hive On Spark.
  • Pulling an internal branch to do bugfixes or merge features of community patches for some of the problems normally encountered.

The last part of the practice in terms of platform stability is Increase in authority , security to prevent illegal access to clusters, data, attacks, etc. Raising permissions comes in two main pieces: API access and clustering.

Figure 14

  • API Server : As mentioned above we have OneDataAPI, which provides a unified API for each business system to access data. The business system must access the CA to get the token and then access OneDataAPI, and OneDataAPI will only allow the legitimate ones to access the data after the CA has verified it, thus preventing the business system from accessing all data indicators arbitrarily.
  • cluster : Currently, it is mainly based on Apache Ranger to unify various clusters, including Kafka, Hbase, Hadoop, etc. to do cluster authorization management and maintenance.

These are some of the practices and optimizations that Meitu has made to the stability of its data platform after it has been built and opened to various business lines.

Then the next is a brief summary of the data platform building process.

  • First of all, before building a data platform, be sure to Understanding the business , look at whether the overall volume of the business is relatively large, the business lines are relatively broad, and the demand is so high that it seriously affects our productivity. If both answer in the affirmative, then consider building a data platform as soon as possible to more efficiently and quickly increase the efficiency of data development and application. If the volume of business itself, the demand is not much, it does not have to set big data or build how well the data platform to quickly meet the priority of supporting business.
  • In the process of building the platform, there is a need to focus on Focus on data quality, stability of platform For example, focus on the integrity of data source collection, timeliness, and unique identification of devices, and do more optimization and practice in the stability of the platform to provide a stable and reliable platform for the business side.
  • There is a need to do some optimization and thinking about costs and resources in order to improve the efficiency of analytical decisions and the gradual increase in scale.


1、The digital asset blockchain is not an improvement on the existing system its a revolution
2、Guizhou on the cloud rises medical health cloud
3、Travel regulation service platform opens on July 1 to prevent unreasonable low price tours
4、Data Visualization V Making Visual Charts Based on Web Crawlers
5、LiteCoin first use of scrypt encryption algorithm

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送