cool hit counter Talking about the implementation of jpa's batch operation_Intefrankly

Talking about the implementation of jpa's batch operation


preface

This article looks at the implementation of jpa's batch operation

save method

SessionImpl.persist

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

    @Override
    public void persist(String entityName, Object object) throws HibernateException {
        firePersist( new PersistEvent( entityName, object, this ) );
    }

    private void firePersist(PersistEvent event) {
        errorIfClosed();
        checkTransactionSynchStatus();
        checkNoUnresolvedActionsBeforeOperation();
        for ( PersistEventListener listener : listeners( EventType.PERSIST ) ) {
            listener.onPersist( event );
        }
        checkNoUnresolvedActionsAfterOperation();
    }

The persist event is triggered

The flush method

SessionImpl.flush

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

@Override
    public void flush() throws HibernateException {
        errorIfClosed();
        checkTransactionSynchStatus();
        if ( persistenceContext.getCascadeLevel() > 0 ) {
            throw new HibernateException( "Flush during cascade is dangerous" );
        }
        FlushEvent flushEvent = new FlushEvent( this );
        for ( FlushEventListener listener : listeners( EventType.FLUSH ) ) {
            listener.onFlush( flushEvent );
        }
        delayedAfterCompletion();
    }

DefaultFlushEventListener.onFlush

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/event/internal/DefaultFlushEventListener.java

/** Handle the given flush event.
     *
     * @param event The flush event to be handled.
     * @throws HibernateException
     */
    public void onFlush(FlushEvent event) throws HibernateException {
        final EventSource source = event.getSession();
        final PersistenceContext persistenceContext = source.getPersistenceContext();

        if ( persistenceContext.getNumberOfManagedEntities() > 0 ||
                persistenceContext.getCollectionEntries().size() > 0 ) {

            try {
                source.getEventListenerManager().flushStart();

                flushEverythingToExecutions( event );
                performExecutions( source );
                postFlush( source );
            }
            finally {
                source.getEventListenerManager().flushEnd(
                        event.getNumberOfEntitiesProcessed(),
                        event.getNumberOfCollectionsProcessed()
                );
            }

            postPostFlush( source );

            if ( source.getFactory().getStatistics().isStatisticsEnabled() ) {
                source.getFactory().getStatisticsImplementor().flush();
            }
        }
    }

here invoke finishperformExecutions

AbstractFlushingEventListener.performExecutions

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/event/internal/AbstractFlushingEventListener.java

    /**
     * Execute all SQL (and second-level cache updates) in a special order so that foreign-key constraints cannot
     * be violated: <ol>
     * <li> Inserts, in the order they were performed
     * <li> Updates
     * <li> Deletion of collection elements
     * <li> Insertion of collection elements
     * <li> Deletes, in the order they were performed
     * </ol>
     *
     * @param session The session being flushed
     */
    protected void performExecutions(EventSource session) {
        LOG.trace( "Executing flush" );

        // IMPL NOTE : here we alter the flushing flag of the persistence context to allow
        //        during-flush callbacks more leniency in regards to initializing proxies and
        //        lazy collections during their processing.
        // For more information, see HHH-2763
        try {
            session.getJdbcCoordinator().flushBeginning();
            session.getPersistenceContext().setFlushing( true );
            // we need to lock the collection caches before executing entity inserts/updates in order to
            // account for bi-directional associations
            session.getActionQueue().prepareActions();
            session.getActionQueue().executeActions();
        }
        finally {
            session.getPersistenceContext().setFlushing( false );
            session.getJdbcCoordinator().flushEnding();
        }
    }

Here session.getActionQueue().executeActions() is called;

ActionQueue.executeActions

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/spi/ActionQueue.java

    /**
     * Perform all currently queued actions.
     * 
     * @throws HibernateException error executing queued actions.
     */
    public void executeActions() throws HibernateException {
        if ( hasUnresolvedEntityInsertActions() ) {
            throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." );
        }

        for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) {
            ExecutableList<?> l = listProvider.get( this );
            if ( l != null && !l.isEmpty() ) {
                executeActions( l );
            }
        }
    }

        /**
     * Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list
     * 
     * @param list The list of Executable elements to be performed
     *
     * @throws HibernateException
     */
    private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException {
        // todo : consider ways to improve the double iteration of Executables here:
        //        1) we explicitly iterate list here to perform Executable#execute()
        //        2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces.
        try {
            for ( E e : list ) {
                try {
                    e.execute();
                }
                finally {
                    if( e.getBeforeTransactionCompletionProcess() != null ) {
                        if( beforeTransactionProcesses == null ) {
                            beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
                        }
                        beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess());
                    }
                    if( e.getAfterTransactionCompletionProcess() != null ) {
                        if( afterTransactionProcesses == null ) {
                            afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
                        }
                        afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess());
                    }
                }
            }
        }
        finally {
            if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) {
                // Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs.
                // We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are
                // unexpected.
                Set<Serializable> propertySpaces = list.getQuerySpaces();
                invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) );
            }
        }

        list.clear();
        session.getJdbcCoordinator().executeBatch();
    }

Here e.execute() is called inside the for loop; Also after the loop, the session.getJdbcCoordinator().executeBatch() is called after finally; It fits the jdbc statement's executeBatch call pattern, and it can be expected that e.execute() performs the addBatch operation, while executeBatch() is called first when a batch is reached

EntityInsertAction.execute

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/action/internal/EntityInsertAction.java

    @Override
    public void execute() throws HibernateException {
        nullifyTransientReferencesIfNotAlready();

        final EntityPersister persister = getPersister();
        final SessionImplementor session = getSession();
        final Object instance = getInstance();
        final Serializable id = getId();

        final boolean veto = preInsert();

        // Don't need to lock the cache here, since if someone
        // else inserted the same pk first, the insert would fail

        if ( !veto ) {

            persister.insert( id, getState(), instance, session );
            PersistenceContext persistenceContext = session.getPersistenceContext();
            final EntityEntry entry = persistenceContext.getEntry( instance );
            if ( entry == null ) {
                throw new AssertionFailure( "possible non-threadsafe access to session" );
            }

            entry.postInsert( getState() );

            if ( persister.hasInsertGeneratedProperties() ) {
                persister.processInsertGeneratedProperties( id, instance, getState(), session );
                if ( persister.isVersionPropertyGenerated() ) {
                    version = Versioning.getVersion( getState(), persister );
                }
                entry.postUpdate( instance, getState(), version );
            }

            persistenceContext.registerInsertedKey( persister, getId() );
        }

        final SessionFactoryImplementor factory = session.getFactory();

        if ( isCachePutEnabled( persister, session ) ) {
            final CacheEntry ce = persister.buildCacheEntry(
                    instance,
                    getState(),
                    version,
                    session
            );
            cacheEntry = persister.getCacheEntryStructure().structure( ce );
            final EntityRegionAccessStrategy cache = persister.getCacheAccessStrategy();
            final Object ck = cache.generateCacheKey( id, persister, factory, session.getTenantIdentifier() );

            final boolean put = cacheInsert( persister, ck );

            if ( put && factory.getStatistics().isStatisticsEnabled() ) {
                factory.getStatisticsImplementor().secondLevelCachePut( cache.getRegion().getName() );
            }
        }

        handleNaturalIdPostSaveNotifications( id );

        postInsert();

        if ( factory.getStatistics().isStatisticsEnabled() && !veto ) {
            factory.getStatisticsImplementor().insertEntity( getPersister().getEntityName() );
        }

        markExecuted();
    }

The insert method of persister is called

AbstractEntityPersister.insert

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/persister/entity/AbstractEntityPersister.java

    public void insert(Serializable id, Object[] fields, Object object, SessionImplementor session) {
        // apply any pre-insert in-memory value generation
        preInsertInMemoryValueGeneration( fields, object, session );

        final int span = getTableSpan();
        if ( entityMetamodel.isDynamicInsert() ) {
            // For the case of dynamic-insert="true", we need to generate the INSERT SQL
            boolean[] notNull = getPropertiesToInsert( fields );
            for ( int j = 0; j < span; j++ ) {
                insert( id, fields, notNull, j, generateInsertString( notNull, j ), object, session );
            }
        }
        else {
            // For the case of dynamic-insert="false", use the static SQL
            for ( int j = 0; j < span; j++ ) {
                insert( id, fields, getPropertyInsertability(), j, getSQLInsertStrings()[j], object, session );
            }
        }
    }

insert

    /**
     * Perform an SQL INSERT.
     * <p/>
     * This for is used for all non-root tables as well as the root table
     * in cases where the identifier value is known before the insert occurs.
     */
    protected void insert(
            final Serializable id,
            final Object[] fields,
            final boolean[] notNull,
            final int j,
            final String sql,
            final Object object,
            final SessionImplementor session) throws HibernateException {

        if ( isInverseTable( j ) ) {
            return;
        }

        //note: it is conceptually possible that a UserType could map null to
        //      a non-null value, so the following is arguable:
        if ( isNullableTable( j ) && isAllNull( fields, j ) ) {
            return;
        }

        if ( LOG.isTraceEnabled() ) {
            LOG.tracev( "Inserting entity: {0}", MessageHelper.infoString( this, id, getFactory() ) );
            if ( j == 0 && isVersioned() ) {
                LOG.tracev( "Version: {0}", Versioning.getVersion( fields, this ) );
            }
        }

        // TODO : shouldn't inserts be Expectations.NONE?
        final Expectation expectation = Expectations.appropriateExpectation( insertResultCheckStyles[j] );
        // we can't batch joined inserts, *especially* not if it is an identity insert;
        // nor can we batch statements where the expectation is based on an output param
        final boolean useBatch = j == 0 && expectation.canBeBatched();
        if ( useBatch && inserBatchKey == null ) {
            inserBatchKey = new BasicBatchKey(
                    getEntityName() + "#INSERT",
                    expectation
            );
        }
        final boolean callable = isInsertCallable( j );

        try {
            // Render the SQL query
            final PreparedStatement insert;
            if ( useBatch ) {
                insert = session
                        .getJdbcCoordinator()
                        .getBatch( inserBatchKey )
                        .getBatchStatement( sql, callable );
            }
            else {
                insert = session
                        .getJdbcCoordinator()
                        .getStatementPreparer()
                        .prepareStatement( sql, callable );
            }

            try {
                int index = 1;
                index += expectation.prepare( insert );

                // Write the values of fields onto the prepared statement - we MUST use the state at the time the
                // insert was issued (cos of foreign key constraints). Not necessarily the object's current state

                dehydrate( id, fields, null, notNull, propertyColumnInsertable, j, insert, session, index, false );

                if ( useBatch ) {
                    session.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch();
                }
                else {
                    expectation.verifyOutcome(
                            session.getJdbcCoordinator()
                                    .getResultSetReturn()
                                    .executeUpdate( insert ), insert, -1
                    );
                }

            }
            catch (SQLException e) {
                if ( useBatch ) {
                    session.getJdbcCoordinator().abortBatch();
                }
                throw e;
            }
            finally {
                if ( !useBatch ) {
                    session.getJdbcCoordinator().getResourceRegistry().release( insert );
                    session.getJdbcCoordinator().afterStatementExecution();
                }
            }
        }
        catch (SQLException e) {
            throw getFactory().getSQLExceptionHelper().convert(
                    e,
                    "could not insert: " + MessageHelper.infoString( this ),
                    sql
            );
        }

    }

useBatch because oftrue, invokesession.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch() Here the insertBatchKey is com.example.domain.DemoUser#INSERT

JdbcCoordinatorImpl.getBatch

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/jdbc/internal/JdbcCoordinatorImpl.java

    @Override
    public Batch getBatch(BatchKey key) {
        if ( currentBatch != null ) {
            if ( currentBatch.getKey().equals( key ) ) {
                return currentBatch;
            }
            else {
                currentBatch.execute();
                currentBatch.release();
            }
        }
        currentBatch = batchBuilder().buildBatch( key, this );
        return currentBatch;
    }

BatchingBatch.addToBatch

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java

    @Override
    public void addToBatch() {
        try {
            currentStatement.addBatch();
        }
        catch ( SQLException e ) {
            LOG.debugf( "SQLException escaped proxy", e );
            throw sqlExceptionHelper().convert( e, "could not perform addBatch", currentStatementSql );
        }
        statementPosition++;
        if ( statementPosition >= getKey().getBatchedStatementCount() ) {
            batchPosition++;
            if ( batchPosition == batchSize ) {
                notifyObserversImplicitExecution();
                performExecution();
                batchPosition = 0;
                batchExecuted = true;
            }
            statementPosition = 0;
        }
    }

Here in batch enough, performExecution will be executed

performExecution

private void performExecution() {
        LOG.debugf( "Executing batch size: %s", batchPosition );
        try {
            for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) {
                try {
                    final PreparedStatement statement = entry.getValue();
                    final int[] rowCounts;
                    try {
                        getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart();
                        rowCounts = statement.executeBatch();
                    }
                    finally {
                        getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd();
                    }
                    checkRowCounts( rowCounts, statement );
                }
                catch ( SQLException e ) {
                    abortBatch();
                    throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() );
                }
            }
        }
        catch ( RuntimeException re ) {
            LOG.unableToExecuteBatch( re.getMessage() );
            throw re;
        }
        finally {
            batchPosition = 0;
        }
    }

You can see that statement.executeBatch() is called here

summary

  • jpa of save method First add the data to the buildaction queue inside
  • At the time of flush, then construct the batch operation of the statement via the insert action, and then only PERFORM when you reach a batch
  • jpa's batch operations are also wrapped in jdbc's statment's addBatch and executeBatch, see ActionQueue.executeActions for details

The specific model is as follows

    public void jdbcBatchOperationTemplate(List<Employee> data){
        String sql = "insert into employee (name, city, phone) values (?, ?, ?)";

        Connection conn = null;
        PreparedStatement pstmt = null;

        final int batchSize = 1000;
        int count = 0;

        try{
            conn = dataSource.getConnection();
            pstmt = conn.prepareStatement(sql);

            for (Employee item: data) {
                pstmt.setString(1,item.getName());
                pstmt.setString(2,item.getCity());
                pstmt.setString(3,item.getPhone());

                // Add tobatch
                pstmt.addBatch();

                // Small batch submissions, avoidOOM
                if(++count % batchSize == 0) {
                    pstmt.executeBatch();
                }
            }

            pstmt.executeBatch(); // Submitting the remaining data

        }catch (SQLException e){
            e.printStackTrace();
        }finally {
            DbUtils.closeQuietly(pstmt);
            DbUtils.closeQuietly(conn);
        }
    }

The only difference is that jpa is committing all the data to the action queue at save time, and then flushing at the end triggers similar addBatch and executeBatch operations as above. For the use of@GeneratedValue(strategy = GenerationType.AUTO), in everysave Add toaction queue It's all been done before. invoke Database accessid。 That is, assuming that to bulkinsert1000 bar of data, followsave put intoaction queue Before it would invoke1000 times to obtain theirid, And then finallyflush at the time of, redirectaction queue of1000 bar of data, do sth. in batches or groupsbatch execute, The equivalent method to the template abovedata The parameters are1000 individualid ofEmployee targets。

    select
        nextval ('hibernate_sequence')

doc

  • Spring Data JPA: Batch insert for nested entities
  • Spring JPA Hibernate - JpaRepository Insert (Batch)

Recommended>>
1、GlobalVRHackathon China Final 3
2、Getting Started with Nginx
3、How do I set Google Chrome to open links in a new window How do I set Google Chrome to open links in a new tab
4、SDSDI Data Analysis
5、SolvedThe barrier to entry for deep learning is too low

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号