[FLINK-7195] [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer
Previously, querying the partition list and using it to filter out
restored partition states is problematic since the queried partition
list may be missing partitions due to temporary downtime of Kafka
brokers. Effectively, this caused the potential dropping of state on
restores.
This commit fixes this by completely removing partition querying if
we're restoring state (as notified by
FunctionInitializationContext.isRestored()). The subscribed partitions
will always be exactly what the restored state contains.
This closes #4357.
This closes #4344.
This closes #4301.