Proto class kafka.message.ExchangeMessage$Order not found in the classpath
I am trying to setup a mongo-kafka-connector against Apache Kafka 2.5. I am using Kafka-protobuff-connector.
ExchangeMessage.proto -------------------- syntax = "proto3"; package exchange_message_def; option java_package = "kafka.message"; option java_outer_classname = "ExchangeMessage"; option optimize_for = SPEED; message Order { string oid = 1; }
I built a jar file for this proto and placed within plugins path. But I am getting following error.
My Pom file:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.data</groupId> <artifactId>data-exchange</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <skip.tests>false</skip.tests> </properties> <dependencies> <!-- Unit testing --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- Protocol Buffers --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.12.2</version> </dependency> <dependency> <groupId>com.github.os72</groupId> <artifactId>protoc-jar-maven-plugin</artifactId> <version>3.11.4</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>protoc-gen-grpc-java</artifactId> <version>1.30.2</version> <type>pom</type> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>11</source> <target>11</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> <configuration> <finalName>${project.name}-${project.version}</finalName> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> </manifest> </archive> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <appendAssemblyId>false</appendAssemblyId> <finalName>${project.name}-${project.version}</finalName> <descriptors> <descriptor>src/assembly/dist.xml</descriptor> </descriptors> </configuration> </plugin> <plugin> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <phase>process-sources</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>com.github.os72</groupId> <artifactId>protoc-jar-maven-plugin</artifactId> <version>3.11.4</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>run</goal> </goals> <configuration> <protocVersion>3.12.2</protocVersion> <includeDirectories> <include>src/main/proto</include> </includeDirectories> <inputDirectories> <include>src/main/proto</include> </inputDirectories> <outputTargets> <outputTarget> <type>java</type> <addSources>main</addSources> <outputDirectory>src/main/java</outputDirectory> </outputTarget> <outputTarget> <type>grpc-java</type> <addSources>main</addSources> <outputDirectory>src/main/grpc</outputDirectory> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.30.2</pluginArtifact> </outputTarget> <outputTarget> <type>python</type> <addSources>main</addSources> <outputDirectory>src/main/python</outputDirectory> </outputTarget> <outputTarget> <type>js</type> <addSources>main</addSources> <outputDirectory>src/main/js</outputDirectory> </outputTarget> </outputTargets> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
Mongo-Sink config :
{ "name":"mongo-sink", "config":{ "name": "mongo-sink", "topics": "zz", "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "key.converter": "org.apache.kafka.connect.converters.IntegerConverter", "value.converter": "com.blueapron.connect.protobuf.ProtobufConverter", "value.converter.protoClassName":"kafka.message.ExchangeMessage$Order", "key.converter.schemas.enable":false, "value.converter.schemas.enable":true, "connection.uri": "mongodb://user:password@localhost:27017", "database":"data_db", "collection":"zz", "max.num.retries":"3", "retries.defer.timeout":"5000", "key.projection.type":"none", "key.projection.list": "", "value.projection.type":"none", "value.projection.list": "", "field.renamer.mapping":"[]", "field.renamer.regex":"[]", "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy", "post.processor.chain":"com.mongodb.kafka.connect.sink.processor.DocumentIdAdder", "delete.on.null.values":false, "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy", "max.batch.size":"2", "rate.limiting.timeout":"100", "rate.limiting.every.n":"100", "change.data.capture.handler":"" } }
I am getting following error:
org.apache.kafka.connect.errors.ConnectException: Proto class kafka.message.ExchangeMessage$Order not found in the classpath at com.blueapron.connect.protobuf.ProtobufConverter.configure(ProtobufConverter.java:48) at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:293) at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:442) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)