Class DataExtractorService
java.lang.Object
org.jumpmind.symmetric.service.impl.AbstractService
org.jumpmind.symmetric.service.impl.DataExtractorService
- All Implemented Interfaces:
IDataExtractorService,INodeCommunicationService.INodeCommunicationExecutor,IService
- Direct Known Subclasses:
FileSyncExtractorService
public class DataExtractorService
extends AbstractService
implements IDataExtractorService, INodeCommunicationService.INodeCommunicationExecutor
- See Also:
-
Nested Class Summary
Nested Classes -
Field Summary
FieldsFields inherited from class org.jumpmind.symmetric.service.impl.AbstractService
log, parameterService, platform, sqlTemplate, sqlTemplateDirty, symmetricDialect, tablePrefix -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected ExtractDataReaderbuildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo) protected ExtractDataReaderbuildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo, boolean containsBigLob) protected MultiBatchStagingWriterbuildMultiBatchStagingWriter(ExtractRequest request, List<ExtractRequest> childRequests, Node sourceNode, Node targetNode, List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel, boolean isRestarted) intcancelExtractRequests(long loadId) protected booleancanProcessExtractRequest(ExtractRequest request, NodeCommunication.CommunicationType communicationType) protected final booleanchangeBatchStatus(AbstractBatch.Status status, OutgoingBatch currentBatch, DataExtractorService.ExtractMode mode) protected voidcheckSendDeferredConstraints(ExtractRequest request, List<ExtractRequest> childRequests, Node targetNode) protected voidcleanupIgnoredBatch(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, IDataWriter writer) protected TransformWritercreateTransformDataWriter(Node identity, Node targetNode, IDataWriter extractWriter) voidexecute(NodeCommunication nodeCommunication, RemoteNodeStatus status) This is a callback method used by the NodeCommunicationService that extracts an initial load in the background.extract(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) protected List<OutgoingBatch>extract(ProcessInfo extractInfo, Node targetNode, List<OutgoingBatch> activeBatches, IDataWriter dataWriter, BufferedWriter writer, DataExtractorService.ExtractMode mode) extract(ProcessInfo extractInfo, Node targetNode, IOutgoingTransport transport) protected org.jumpmind.symmetric.service.impl.DataExtractorService.FutureOutgoingBatchextractBatch(OutgoingBatch extractBatch, org.jumpmind.symmetric.service.impl.DataExtractorService.FutureExtractStatus status, ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, DataExtractorService.ExtractMode mode, List<OutgoingBatch> activeBatches) booleanextractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId) booleanextractBatchRange(Writer writer, String nodeId, Date startBatchTime, Date endBatchTime, String... channelIds) voidextractConfigurationStandalone(Node targetNode, Writer writer, String... tablesToExclude) Extract the SymmetricDS configuration for the passed inNode.booleanextractOnlyOutgoingBatch(String nodeId, long batchId, Writer writer) This method will extract an outgoing batch, but will not update the outgoing batch statusprotected OutgoingBatchextractOutgoingBatch(ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter, boolean updateBatchStatistics, DataExtractorService.ExtractMode mode, IDataProcessorListener listener) extractToPayload(ProcessInfo processInfo, Node targetNode, StructureDataWriter.PayloadType payloadType, boolean useJdbcTimestampFormat, boolean useUpsertStatements, boolean useDelimiterIdentifiers) protected intfindStatsIndex(String bufferString, String prevBuffer) protected StringgetBatchStats(OutgoingBatch batch) protected StringgetCompletedTablesForExtractByLoadId(long loadId) getCompletedTablesForExtractByLoadIdAndNodeId(long loadId, String nodeId) protected List<ExtractRequest>getExtractChildRequestsForNode(ExtractRequest parentRequest) protected Map<Long,List<ExtractRequest>> getExtractChildRequestsForNode(NodeCommunication nodeCommunication, List<ExtractRequest> parentRequests) protected ExtractRequestList<org.jumpmind.symmetric.service.impl.DataExtractorService.NodeQueuePair>protected List<ExtractRequest>getExtractRequestsForNode(NodeCommunication nodeCommunication) protected StatisticsgetExtractStats(IDataWriter writer, OutgoingBatch currentBatch) getPendingTablesForExtractByLoadId(long loadId) getPendingTablesForExtractByLoadIdAndNodeId(long loadId, String nodeId) protected ProcessTypeprotected StringgetSemaphoreKey(OutgoingBatch batch, boolean useStagingDataWriter) protected IStagedResourcegetStagedResource(OutgoingBatch currentBatch) protected booleanisApplicable(NodeCommunication nodeCommunication) protected booleanisPreviouslyExtracted(OutgoingBatch currentBatch, boolean acquireReference) protected booleanisRetry(OutgoingBatch currentBatch, Node remoteNode) protected booleanprotected OutgoingBatchesloadPendingBatches(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) protected voidqueue(String nodeId, String queue, RemoteNodeStatuses statuses) queueWork(boolean force) protected voidreleaseLock(org.jumpmind.symmetric.service.impl.DataExtractorService.BatchLock lock, OutgoingBatch batch, boolean useStagingDataWriter) voidvoidprotected final OutgoingBatchrequeryIfEnoughTimeHasPassed(long ts, OutgoingBatch currentBatch) If time has passed, then re-query the batch to double check that the status has not changedrequestExtractRequest(ISqlTransaction transaction, String nodeId, String queue, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String table, long rows, long parentRequestId) voidresetExtractRequest(OutgoingBatch batch) protected voidrestartExtractRequest(List<OutgoingBatch> batches, ExtractRequest request, List<ExtractRequest> childRequests) protected OutgoingBatchsendOutgoingBatch(ProcessInfo processInfo, Node targetNode, OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, DataExtractorService.ExtractMode mode) protected voidtransferFromStaging(DataExtractorService.ExtractMode mode, Batch.BatchType batchType, OutgoingBatch batch, boolean isRetry, IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec, ProcessInfo processInfo) protected voidtriggerReExtraction(OutgoingBatch currentBatch) voidupdateExtractRequestLoadTime(ISqlTransaction transaction, Date loadTime, OutgoingBatch outgoingBatch) protected voidupdateExtractRequestStatus(ISqlTransaction transaction, long extractId, ExtractRequest.ExtractStatus status, long extractedRows, long extractedMillis) voidupdateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId, String fromStatus, String toStatus) voidupdateExtractRequestTransferred(OutgoingBatch batch, long transferMillis) protected IDataWriterwrapWithTransformWriter(Node sourceNode, Node targetNode, ProcessInfo processInfo, IDataWriter dataWriter, boolean useStagingDataWriter) protected booleanwriteBatchStats(BufferedWriter writer, char[] buffer, int bufferSize, String prevBuffer, OutgoingBatch batch) protected voidwriteKeepAliveAck(BufferedWriter writer, Node sourceNode, boolean streamToFileEnabled) Methods inherited from class org.jumpmind.symmetric.service.impl.AbstractService
assertNotNull, buildBatchOrderBy, buildBatchParams, buildBatchWhere, buildBatchWhereFromFilter, close, createSqlReplacementTokens, getJdbcTemplate, getParameterService, getSql, getSymmetricDialect, getTablePrefix, getTargetDialect, getTargetPlatform, getTargetPlatform, isCalledFromSymmetricAdminTool, isSet, isStreamClosedByClient, isSymmetricTable, logOnce, maxDate, readAcks, sendAck, setSqlMap, synchronize, toNodeIds, toNodeIds
-
Field Details
-
engine
-
-
Constructor Details
-
DataExtractorService
-
-
Method Details
-
extractConfigurationStandalone
public void extractConfigurationStandalone(Node targetNode, Writer writer, String... tablesToExclude) Extract the SymmetricDS configuration for the passed inNode.- Specified by:
extractConfigurationStandalonein interfaceIDataExtractorService
-
extractToPayload
public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo, Node targetNode, StructureDataWriter.PayloadType payloadType, boolean useJdbcTimestampFormat, boolean useUpsertStatements, boolean useDelimiterIdentifiers) - Specified by:
extractToPayloadin interfaceIDataExtractorService
-
extract
public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, IOutgoingTransport transport) - Specified by:
extractin interfaceIDataExtractorService- Returns:
- a list of batches that were extracted
-
extract
public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) - Specified by:
extractin interfaceIDataExtractorService
-
loadPendingBatches
protected OutgoingBatches loadPendingBatches(ProcessInfo extractInfo, Node targetNode, String queue, IOutgoingTransport transport) -
extractOnlyOutgoingBatch
This method will extract an outgoing batch, but will not update the outgoing batch status- Specified by:
extractOnlyOutgoingBatchin interfaceIDataExtractorService
-
extract
protected List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, List<OutgoingBatch> activeBatches, IDataWriter dataWriter, BufferedWriter writer, DataExtractorService.ExtractMode mode) -
extractBatch
protected org.jumpmind.symmetric.service.impl.DataExtractorService.FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, org.jumpmind.symmetric.service.impl.DataExtractorService.FutureExtractStatus status, ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, DataExtractorService.ExtractMode mode, List<OutgoingBatch> activeBatches) throws Exception - Throws:
Exception
-
writeKeepAliveAck
protected void writeKeepAliveAck(BufferedWriter writer, Node sourceNode, boolean streamToFileEnabled) -
changeBatchStatus
protected final boolean changeBatchStatus(AbstractBatch.Status status, OutgoingBatch currentBatch, DataExtractorService.ExtractMode mode) -
requeryIfEnoughTimeHasPassed
If time has passed, then re-query the batch to double check that the status has not changed -
extractOutgoingBatch
protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targetNode, IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter, boolean updateBatchStatistics, DataExtractorService.ExtractMode mode, IDataProcessorListener listener) -
getSemaphoreKey
-
acquireStagingFileLock
- Specified by:
acquireStagingFileLockin interfaceIDataExtractorService
-
releaseLock
protected void releaseLock(org.jumpmind.symmetric.service.impl.DataExtractorService.BatchLock lock, OutgoingBatch batch, boolean useStagingDataWriter) -
isStagingFileLockRequired
-
triggerReExtraction
-
buildExtractDataReader
protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo) -
buildExtractDataReader
protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, ProcessInfo processInfo, boolean containsBigLob) -
getExtractStats
-
wrapWithTransformWriter
protected IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode, ProcessInfo processInfo, IDataWriter dataWriter, boolean useStagingDataWriter) -
cleanupIgnoredBatch
protected void cleanupIgnoredBatch(Node sourceNode, Node targetNode, OutgoingBatch currentBatch, IDataWriter writer) -
getStagedResource
-
isPreviouslyExtracted
-
isRetry
-
sendOutgoingBatch
protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode, OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, DataExtractorService.ExtractMode mode) -
transferFromStaging
protected void transferFromStaging(DataExtractorService.ExtractMode mode, Batch.BatchType batchType, OutgoingBatch batch, boolean isRetry, IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec, ProcessInfo processInfo) -
findStatsIndex
-
getPendingTablesForExtractByLoadId
- Specified by:
getPendingTablesForExtractByLoadIdin interfaceIDataExtractorService
-
getCompletedTablesForExtractByLoadId
- Specified by:
getCompletedTablesForExtractByLoadIdin interfaceIDataExtractorService
-
getPendingTablesForExtractByLoadIdAndNodeId
- Specified by:
getPendingTablesForExtractByLoadIdAndNodeIdin interfaceIDataExtractorService
-
getCompletedTablesForExtractByLoadIdAndNodeId
public List<ExtractRequest> getCompletedTablesForExtractByLoadIdAndNodeId(long loadId, String nodeId) - Specified by:
getCompletedTablesForExtractByLoadIdAndNodeIdin interfaceIDataExtractorService
-
updateExtractRequestLoadTime
public void updateExtractRequestLoadTime(ISqlTransaction transaction, Date loadTime, OutgoingBatch outgoingBatch) - Specified by:
updateExtractRequestLoadTimein interfaceIDataExtractorService
-
updateExtractRequestTransferred
- Specified by:
updateExtractRequestTransferredin interfaceIDataExtractorService
-
cancelExtractRequests
public int cancelExtractRequests(long loadId) - Specified by:
cancelExtractRequestsin interfaceIDataExtractorService
-
writeBatchStats
protected boolean writeBatchStats(BufferedWriter writer, char[] buffer, int bufferSize, String prevBuffer, OutgoingBatch batch) throws IOException - Throws:
IOException
-
getBatchStatsColumns
-
getBatchStats
-
extractBatchRange
- Specified by:
extractBatchRangein interfaceIDataExtractorService
-
extractBatchRange
public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTime, Date endBatchTime, String... channelIds) - Specified by:
extractBatchRangein interfaceIDataExtractorService
-
createTransformDataWriter
protected TransformWriter createTransformDataWriter(Node identity, Node targetNode, IDataWriter extractWriter) -
queueWork
- Specified by:
queueWorkin interfaceIDataExtractorService
-
queue
-
getExtractRequestNodes
public List<org.jumpmind.symmetric.service.impl.DataExtractorService.NodeQueuePair> getExtractRequestNodes() -
getExtractRequestsForNode
-
getExtractRequestForBatch
-
getExtractChildRequestsForNode
protected Map<Long,List<ExtractRequest>> getExtractChildRequestsForNode(NodeCommunication nodeCommunication, List<ExtractRequest> parentRequests) -
getExtractChildRequestsForNode
-
resetExtractRequest
- Specified by:
resetExtractRequestin interfaceIDataExtractorService
-
requestExtractRequest
public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String table, long rows, long parentRequestId) - Specified by:
requestExtractRequestin interfaceIDataExtractorService
-
updateExtractRequestStatus
protected void updateExtractRequestStatus(ISqlTransaction transaction, long extractId, ExtractRequest.ExtractStatus status, long extractedRows, long extractedMillis) -
canProcessExtractRequest
protected boolean canProcessExtractRequest(ExtractRequest request, NodeCommunication.CommunicationType communicationType) -
execute
This is a callback method used by the NodeCommunicationService that extracts an initial load in the background.- Specified by:
executein interfaceINodeCommunicationService.INodeCommunicationExecutor
-
restartExtractRequest
protected void restartExtractRequest(List<OutgoingBatch> batches, ExtractRequest request, List<ExtractRequest> childRequests) -
releaseMissedExtractRequests
public void releaseMissedExtractRequests()- Specified by:
releaseMissedExtractRequestsin interfaceIDataExtractorService
-
checkSendDeferredConstraints
protected void checkSendDeferredConstraints(ExtractRequest request, List<ExtractRequest> childRequests, Node targetNode) -
isApplicable
-
buildMultiBatchStagingWriter
protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, List<ExtractRequest> childRequests, Node sourceNode, Node targetNode, List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel, boolean isRestarted) -
getProcessType
-
removeBatchFromStaging
- Specified by:
removeBatchFromStagingin interfaceIDataExtractorService
-
updateExtractRequestStatuses
public void updateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId, String fromStatus, String toStatus) - Specified by:
updateExtractRequestStatusesin interfaceIDataExtractorService
-