Class KafkaWriter
java.lang.Object
org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter
org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter
org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter
org.jumpmind.symmetric.io.data.writer.KafkaWriter
- All Implemented Interfaces:
IDataResource
,IDataWriter
-
Nested Class Summary
Nested classes/interfaces inherited from class org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter
AbstractDatabaseWriter.LoadStatus
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
static final String
protected final String
static final String
static final String
protected String
Fields inherited from class org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter
tablePrefix, targetPlatform, targetTransaction
Fields inherited from class org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter
CUR_DATA, currentDmlStatement, currentDmlValues, hasUncommittedDdl, isCteExpression, logSqlBuilder, platform, transaction
Fields inherited from class org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter
batch, CONFLICT_ERROR, CONFLICT_IGNORE, conflictResolver, context, isRequiresSavePointsInTransaction, lastApplyChangesOnly, lastData, lastUseConflictDetection, missingTables, sourceTable, statistics, targetColumnSourceReferencesMap, targetTable, targetTables, TRANSACTION_ABORTED, uncommittedCount, writerSettings
-
Constructor Summary
ConstructorsConstructorDescriptionKafkaWriter
(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, String prefix, IDatabaseWriterConflictResolver conflictResolver, DatabaseWriterSettings settings, String producer, String outputFormat, String topicBy, String messageBy, String confluentUrl, String schemaPackage, String externalNodeID, String url, String loadOnlyPrefix, TypedProperties props, String runtimeConfigTablePrefix, String channelReload) -
Method Summary
Modifier and TypeMethodDescriptionprotected void
allowInsertIntoAutoIncrementColumns
(boolean value, Table table) void
batchComplete
(DataContext context) protected boolean
static byte[]
datumToByteArray
(org.apache.avro.Schema schema, org.apache.avro.generic.GenericRecord datum) protected AbstractDatabaseWriter.LoadStatus
void
protected int
Class<?>
getClassByTableName
(String tableName) getColumnName
(String dbTableName, String dbColumnName, Object bean) getTableName
(String dbTableName) protected void
logFailureDetails
(Throwable e, CsvData data, boolean logLastDmlDetails) protected Table
lookupTableAtTarget
(Table sourceTable) protected void
prepare()
protected void
int
prepareAndExecute
(String sql, CsvData data) void
sendKafkaMessage
(org.apache.kafka.clients.producer.ProducerRecord<String, Object> record) protected boolean
protected AbstractDatabaseWriter.LoadStatus
int
writeKafka
(CsvData data, Table table) Methods inherited from class org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter
close, commit, getPlatform, getPlatform, getPlatform, getTablePrefix, getTargetPlatform, getTargetTransaction, getTransaction, getTransaction, getTransaction, isLoadOnly, isSymmetricTable, isSymmetricTable, open, rollback
Methods inherited from class org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter
bindVariables, checkTargetTableHasColumns, commit, containsNullLookupKeyDataSinceLastStatement, create, dmlValuesToString, doesColumnNeedUpdated, end, getCurData, getCurrentDmlStatement, getSqlStatements, getTableKey, getWriterSettings, insert, isCteExpression, isDml, logDataTruncation, logFailureDetails, lookupTableFromCache, preprocessSqlStatement, putTableInCache, quoteString, removeGeneratedColumns, replaceCteExpression, replaceCteExpression, requireNewStatement, start, targetTableWasChangedByFilter, updateChangedDataIndicator
Methods inherited from class org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter
buildTargetColumnReferencesMap, checkForEarlyCommit, clearTargetColumnReferencesMap, filterAfter, filterBefore, filterError, getBatch, getConflictResolver, getContext, getLookupDataMap, getPkDataFor, getRowData, getSourceTable, getStatistics, getTargetColumnReferencesMap, getTargetTable, hasFilterThatHandlesMissingTable, notifyFiltersBatchCommitted, notifyFiltersBatchComplete, notifyFiltersBatchRolledback, notifyFiltersEarlyCommit, refreshTargetColumnReferencesMap, script, setConflictResolver, start, write
-
Field Details
-
KAFKA_TEXT_CACHE
-
kafkaDataMap
-
kafkaDataKey
-
KAFKA_FORMAT_XML
- See Also:
-
KAFKA_FORMAT_JSON
- See Also:
-
KAFKA_FORMAT_AVRO
- See Also:
-
KAFKA_FORMAT_CSV
- See Also:
-
KAFKA_MESSAGE_BY_BATCH
- See Also:
-
KAFKA_MESSAGE_BY_ROW
- See Also:
-
KAFKA_TOPIC_BY_TABLE
- See Also:
-
KAFKA_TOPIC_BY_CHANNEL
- See Also:
-
AVRO_CDC_SCHEMA
- See Also:
-
KAFKA_SECURITY_PROTOCOL_PLAINTEXT
- See Also:
-
KAFKA_SECURITY_PROTOCOL_SASL_PLAINTEXT
- See Also:
-
KAFKA_SECURITY_PROTOCOL_SASL_SSL
- See Also:
-
KAFKA_SECURITY_PROTOCOL_SSL
- See Also:
-
kafkaProducer
-
producerMap
-
-
Constructor Details
-
KafkaWriter
public KafkaWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, String prefix, IDatabaseWriterConflictResolver conflictResolver, DatabaseWriterSettings settings, String producer, String outputFormat, String topicBy, String messageBy, String confluentUrl, String schemaPackage, String externalNodeID, String url, String loadOnlyPrefix, TypedProperties props, String runtimeConfigTablePrefix, String channelReload)
-
-
Method Details
-
prepare
protected void prepare()- Overrides:
prepare
in classDefaultDatabaseWriter
-
prepare
- Overrides:
prepare
in classDefaultDatabaseWriter
-
prepareAndExecute
- Overrides:
prepareAndExecute
in classDefaultDatabaseWriter
-
execute
- Overrides:
execute
in classDefaultDatabaseWriter
-
delete
- Overrides:
delete
in classDefaultDatabaseWriter
-
update
protected AbstractDatabaseWriter.LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) - Overrides:
update
in classDefaultDatabaseWriter
-
create
- Overrides:
create
in classDefaultDatabaseWriter
-
sql
- Overrides:
sql
in classDefaultDatabaseWriter
-
logFailureDetails
- Overrides:
logFailureDetails
in classDefaultDatabaseWriter
-
allowInsertIntoAutoIncrementColumns
- Overrides:
allowInsertIntoAutoIncrementColumns
in classDefaultDatabaseWriter
-
lookupTableAtTarget
- Overrides:
lookupTableAtTarget
in classDefaultDatabaseWriter
-
end
- Specified by:
end
in interfaceIDataWriter
- Overrides:
end
in classDefaultDatabaseWriter
-
getTableName
-
getClassByTableName
-
getColumnName
-
sendKafkaMessage
-
datumToByteArray
public static byte[] datumToByteArray(org.apache.avro.Schema schema, org.apache.avro.generic.GenericRecord datum) throws IOException - Throws:
IOException
-
batchComplete
-
writeKafka
-